From 4ed8ee38f5954b3c193818bc98a34f3659095610 Mon Sep 17 00:00:00 2001 From: Thomas Stadelmann Date: Tue, 10 May 2022 15:52:37 +0200 Subject: [PATCH 1/8] Prevent losing names of utilized components when loaded from config --- haystack/nodes/base.py | 4 +++- haystack/pipelines/base.py | 2 +- haystack/pipelines/ray.py | 2 +- test/test_pipeline.py | 1 + 4 files changed, 6 insertions(+), 3 deletions(-) diff --git a/haystack/nodes/base.py b/haystack/nodes/base.py index 06c5ba26b5..92f5f4b02d 100644 --- a/haystack/nodes/base.py +++ b/haystack/nodes/base.py @@ -119,15 +119,17 @@ def get_subclass(cls, component_type: str) -> Type[BaseComponent]: return subclass @classmethod - def _create_instance(cls, component_type: str, component_params: Dict[str, Any]): + def _create_instance(cls, component_type: str, component_params: Dict[str, Any], name: Optional[str] = None): """ Returns an instance of the given subclass of BaseComponent. :param component_type: name of the component class to load. :param component_params: parameters to pass to the __init__() for the component. + :param name: name of the component instance """ subclass = cls.get_subclass(component_type) instance = subclass(**component_params) + instance.name = name return instance @abstractmethod diff --git a/haystack/pipelines/base.py b/haystack/pipelines/base.py index 4e508758ea..f7f1dc9814 100644 --- a/haystack/pipelines/base.py +++ b/haystack/pipelines/base.py @@ -1277,7 +1277,7 @@ def _load_or_get_component(cls, name: str, definitions: dict, components: dict): value ] # substitute reference (string) with the component object. - component_instance = BaseComponent._create_instance(component_type, component_params) + component_instance = BaseComponent._create_instance(component_type=component_type, component_params=component_params, name=name) components[name] = component_instance return component_instance diff --git a/haystack/pipelines/ray.py b/haystack/pipelines/ray.py index 57092edf23..d9f669d81c 100644 --- a/haystack/pipelines/ray.py +++ b/haystack/pipelines/ray.py @@ -333,6 +333,6 @@ def load_from_pipeline_config(pipeline_config: dict, component_name: str): component_params[key] = _RayDeploymentWrapper.load_from_pipeline_config(pipeline_config, value) component_instance = BaseComponent._create_instance( - component_type=component_config["type"], component_params=component_params + component_type=component_config["type"], component_params=component_params, name=component_name ) return component_instance diff --git a/test/test_pipeline.py b/test/test_pipeline.py index a708edfec3..96935b03df 100644 --- a/test/test_pipeline.py +++ b/test/test_pipeline.py @@ -708,6 +708,7 @@ def test_load_from_deepset_cloud_query(): assert isinstance(retriever, BM25Retriever) assert isinstance(document_store, DeepsetCloudDocumentStore) assert document_store == query_pipeline.get_document_store() + assert document_store.name == "DocumentStore" prediction = query_pipeline.run(query="man on horse", params={}) From bd3d44cad7f3ca2e505a2d22e2045158c6025e74 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 10 May 2022 14:06:06 +0000 Subject: [PATCH 2/8] Update Documentation & Code Style --- haystack/pipelines/base.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/haystack/pipelines/base.py b/haystack/pipelines/base.py index f7f1dc9814..a253cf44d0 100644 --- a/haystack/pipelines/base.py +++ b/haystack/pipelines/base.py @@ -1277,7 +1277,9 @@ def _load_or_get_component(cls, name: str, definitions: dict, components: dict): value ] # substitute reference (string) with the component object. - component_instance = BaseComponent._create_instance(component_type=component_type, component_params=component_params, name=name) + component_instance = BaseComponent._create_instance( + component_type=component_type, component_params=component_params, name=name + ) components[name] = component_instance return component_instance From d765b2db93e1896e2aa4df3b76ad1ba4ad2f6623 Mon Sep 17 00:00:00 2001 From: Thomas Stadelmann Date: Tue, 10 May 2022 16:13:19 +0200 Subject: [PATCH 3/8] update test --- test/test_pipeline_yaml.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/test/test_pipeline_yaml.py b/test/test_pipeline_yaml.py index f1c5f3da77..c9cca17d6f 100644 --- a/test/test_pipeline_yaml.py +++ b/test/test_pipeline_yaml.py @@ -488,6 +488,9 @@ def __init__(self, other_node: OtherNode): ) pipeline = Pipeline.load_from_yaml(path=tmp_path / "tmp_config.yml") assert isinstance(pipeline.get_node("custom_node"), CustomNode) + assert isinstance(pipeline.get_node("custom_node").other_node, OtherNode) + assert pipeline.get_node("custom_node").name == "custom_node" + assert pipeline.get_node("custom_node").other_node.name == "other_node" def test_load_yaml_custom_component_with_helper_class_in_init(tmp_path): From d57a8876af214650b408b6531824e6ba93209004 Mon Sep 17 00:00:00 2001 From: Thomas Stadelmann Date: Tue, 17 May 2022 16:57:57 +0200 Subject: [PATCH 4/8] fix failing tests --- haystack/pipelines/base.py | 10 +++++++--- haystack/pipelines/utils.py | 12 ++++++++++-- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/haystack/pipelines/base.py b/haystack/pipelines/base.py index a253cf44d0..e9cce3feca 100644 --- a/haystack/pipelines/base.py +++ b/haystack/pipelines/base.py @@ -375,11 +375,15 @@ def add_node(self, component: BaseComponent, name: str, inputs: List[str]): self.graph = _init_pipeline_graph(root_node_name=candidate_roots[0]) component_definitions = get_component_definitions(pipeline_config=self.get_config()) + component_definition = {"params": component.get_params(), "type": component.type} - # Check for duplicates before adding the definition - if name in component_definitions.keys(): + # Check for duplicate names before adding the definition + # Note that the very same component must be addable multiple times: + # E.g. for indexing pipelines it's common to add a retriever first and a document store afterwards. The document store is already being used by the retriever however. + # Thus the very same document store will be added twice, first as a subcomponent of the retriever and second as a first level node. + if name in component_definitions.keys() and component_definitions[name] != component_definition: raise PipelineConfigError(f"A node named '{name}' is already in the pipeline. Choose another name.") - component_definitions[name] = component._component_config + component_definitions[name] = component_definition # Name any nested component before adding them component.name = name diff --git a/haystack/pipelines/utils.py b/haystack/pipelines/utils.py index 1b3d92aba3..1ec3e41586 100644 --- a/haystack/pipelines/utils.py +++ b/haystack/pipelines/utils.py @@ -74,6 +74,7 @@ def generate_code( component_definitions=component_definitions, component_variable_names=component_variable_names, dependency_graph=component_dependency_graph, + pipeline_definition=pipeline_definition, ) pipeline_code = _generate_pipeline_code( pipeline_definition=pipeline_definition, @@ -109,7 +110,10 @@ def _generate_pipeline_code( def _generate_components_code( - component_definitions: Dict[str, Any], component_variable_names: Dict[str, str], dependency_graph: DiGraph + component_definitions: Dict[str, Any], + component_variable_names: Dict[str, str], + dependency_graph: DiGraph, + pipeline_definition: Dict[str, Any], ) -> str: code = "" declarations = {} @@ -121,7 +125,11 @@ def _generate_components_code( for key, value in definition.get("params", {}).items() } init_args = ", ".join(f"{key}={value}" for key, value in param_value_dict.items()) - declarations[name] = f"{variable_name} = {class_name}({init_args})" + declaration = f"{variable_name} = {class_name}({init_args})" + # set name of subcomponents explicitly as it won't be set via Pipeline.add_node() + if name not in (node["name"] for node in pipeline_definition["nodes"]): + declaration = f'{declaration}\n{variable_name}.name = "{name}"' + declarations[name] = declaration ordered_components = nx.topological_sort(dependency_graph) ordered_declarations = [declarations[component] for component in ordered_components] From 3ee7d08b023955008f1a349ce2bb34c3ff404d03 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 17 May 2022 15:02:43 +0000 Subject: [PATCH 5/8] Update Documentation & Code Style --- haystack/pipelines/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/haystack/pipelines/base.py b/haystack/pipelines/base.py index e9cce3feca..febd1be888 100644 --- a/haystack/pipelines/base.py +++ b/haystack/pipelines/base.py @@ -378,7 +378,7 @@ def add_node(self, component: BaseComponent, name: str, inputs: List[str]): component_definition = {"params": component.get_params(), "type": component.type} # Check for duplicate names before adding the definition - # Note that the very same component must be addable multiple times: + # Note that the very same component must be addable multiple times: # E.g. for indexing pipelines it's common to add a retriever first and a document store afterwards. The document store is already being used by the retriever however. # Thus the very same document store will be added twice, first as a subcomponent of the retriever and second as a first level node. if name in component_definitions.keys() and component_definitions[name] != component_definition: From 48f10be24d565baade81c4389acf743c57fdca75 Mon Sep 17 00:00:00 2001 From: Thomas Stadelmann Date: Tue, 17 May 2022 20:30:36 +0200 Subject: [PATCH 6/8] fix even more tests --- haystack/pipelines/base.py | 38 ++++++++++++++++++++++++--------- haystack/pipelines/utils.py | 4 ++-- test/pipelines/test_pipeline.py | 3 +++ 3 files changed, 33 insertions(+), 12 deletions(-) diff --git a/haystack/pipelines/base.py b/haystack/pipelines/base.py index 0d1eebed95..c1178c878f 100644 --- a/haystack/pipelines/base.py +++ b/haystack/pipelines/base.py @@ -81,11 +81,28 @@ def root_node(self) -> Optional[str]: @property def components(self) -> Dict[str, BaseComponent]: - return { - name: attributes["component"] - for name, attributes in self.graph.nodes.items() + """ + Returns all components used by this pipeline. + Note that this also includes such components that are being utilized by other components only and are not being used as a pipeline node directly. + """ + node_components = [ + attributes["component"] + for attributes in self.graph.nodes.values() if not isinstance(attributes["component"], RootNode) - } + ] + all_components = self._find_all_components(node_components) + return {component.name: component for component in all_components if component.name is not None} + + def _find_all_components(self, components: List[BaseComponent]) -> Set[BaseComponent]: + """ + Finds all components given the provided components. + Components are found by traversing the provided components and their utilized components. + """ + distinct_components = set(components) + for component in components: + sub_components = self._find_all_components(component.utilized_components) + distinct_components.update(sub_components) + return distinct_components def to_code( self, pipeline_variable_name: str = "pipeline", generate_imports: bool = True, add_comment: bool = False @@ -373,15 +390,16 @@ def add_node(self, component: BaseComponent, name: str, inputs: List[str]): ) self.graph = _init_pipeline_graph(root_node_name=candidate_roots[0]) - component_definitions = get_component_definitions(pipeline_config=self.get_config()) - component_definition = {"params": component.get_params(), "type": component.type} - - # Check for duplicate names before adding the definition + # Check for duplicate names before adding the component # Note that the very same component must be addable multiple times: - # E.g. for indexing pipelines it's common to add a retriever first and a document store afterwards. The document store is already being used by the retriever however. + # E.g. for indexing pipelines it's common to add a retriever first and a document store afterwards. + # The document store is already being used by the retriever however. # Thus the very same document store will be added twice, first as a subcomponent of the retriever and second as a first level node. - if name in component_definitions.keys() and component_definitions[name] != component_definition: + if name in self.components and self.components[name] != component: raise PipelineConfigError(f"A node named '{name}' is already in the pipeline. Choose another name.") + + component_definitions = get_component_definitions(pipeline_config=self.get_config()) + component_definition = {"params": component.get_params(), "type": component.type} component_definitions[name] = component_definition # Name any nested component before adding them diff --git a/haystack/pipelines/utils.py b/haystack/pipelines/utils.py index 0c2b54e04a..b15429319e 100644 --- a/haystack/pipelines/utils.py +++ b/haystack/pipelines/utils.py @@ -126,8 +126,8 @@ def _generate_components_code( } init_args = ", ".join(f"{key}={value}" for key, value in param_value_dict.items()) declaration = f"{variable_name} = {class_name}({init_args})" - # set name of subcomponents explicitly as it won't be set via Pipeline.add_node() - if name not in (node["name"] for node in pipeline_definition["nodes"]): + # set name of subcomponents explicitly if it's not the default name as it won't be set via Pipeline.add_node() + if name != class_name and name not in (node["name"] for node in pipeline_definition["nodes"]): declaration = f'{declaration}\n{variable_name}.name = "{name}"' declarations[name] = declaration diff --git a/test/pipelines/test_pipeline.py b/test/pipelines/test_pipeline.py index 1f3f84f7a4..236e50555b 100644 --- a/test/pipelines/test_pipeline.py +++ b/test/pipelines/test_pipeline.py @@ -467,6 +467,7 @@ def test_generate_code_imports(): "from haystack.pipelines import Pipeline\n" "\n" "document_store = ElasticsearchDocumentStore()\n" + 'document_store.name = "DocumentStore"\n' "retri = BM25Retriever(document_store=document_store)\n" "retri_2 = TfidfRetriever(document_store=document_store)\n" "\n" @@ -497,6 +498,7 @@ def test_generate_code_imports_no_pipeline_cls(): "from haystack.nodes import BM25Retriever\n" "\n" "document_store = ElasticsearchDocumentStore()\n" + 'document_store.name = "DocumentStore"\n' "retri = BM25Retriever(document_store=document_store)\n" "\n" "p = Pipeline()\n" @@ -524,6 +526,7 @@ def test_generate_code_comment(): "from haystack.pipelines import Pipeline\n" "\n" "document_store = ElasticsearchDocumentStore()\n" + 'document_store.name = "DocumentStore"\n' "retri = BM25Retriever(document_store=document_store)\n" "\n" "p = Pipeline()\n" From 70ae14d346736263a5ba75a2307a827448be8e36 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 17 May 2022 18:32:42 +0000 Subject: [PATCH 7/8] Update Documentation & Code Style --- docs/_src/api/api/pipelines.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/docs/_src/api/api/pipelines.md b/docs/_src/api/api/pipelines.md index 7abf1f4501..00490169b2 100644 --- a/docs/_src/api/api/pipelines.md +++ b/docs/_src/api/api/pipelines.md @@ -25,6 +25,18 @@ def root_node() -> Optional[str] Returns the root node of the pipeline's graph. + + +#### Pipeline.components + +```python +@property +def components() -> Dict[str, BaseComponent] +``` + +Returns all components used by this pipeline. +Note that this also includes such components that are being utilized by other components only and are not being used as a pipeline node directly. + #### Pipeline.to\_code From cbb8d3a7c8bf7ccd7f9a2c1973bc4bd5661c328a Mon Sep 17 00:00:00 2001 From: Thomas Stadelmann Date: Wed, 18 May 2022 12:34:29 +0200 Subject: [PATCH 8/8] incorporate review feedback --- haystack/pipelines/base.py | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/haystack/pipelines/base.py b/haystack/pipelines/base.py index c1178c878f..fef289a4f8 100644 --- a/haystack/pipelines/base.py +++ b/haystack/pipelines/base.py @@ -85,21 +85,24 @@ def components(self) -> Dict[str, BaseComponent]: Returns all components used by this pipeline. Note that this also includes such components that are being utilized by other components only and are not being used as a pipeline node directly. """ - node_components = [ - attributes["component"] - for attributes in self.graph.nodes.values() - if not isinstance(attributes["component"], RootNode) - ] - all_components = self._find_all_components(node_components) + all_components = self._find_all_components() return {component.name: component for component in all_components if component.name is not None} - def _find_all_components(self, components: List[BaseComponent]) -> Set[BaseComponent]: + def _find_all_components(self, seed_components: List[BaseComponent] = None) -> Set[BaseComponent]: """ - Finds all components given the provided components. - Components are found by traversing the provided components and their utilized components. + Finds all components given the provided seed components. + Components are found by traversing the provided seed components and their utilized components. + If seed_components is None, the node components (except the root node) of the pipeline will be used as seed components. """ - distinct_components = set(components) - for component in components: + if seed_components is None: + seed_components = [ + attributes["component"] + for attributes in self.graph.nodes.values() + if not isinstance(attributes["component"], RootNode) + ] + + distinct_components = set(seed_components) + for component in seed_components: sub_components = self._find_all_components(component.utilized_components) distinct_components.update(sub_components) return distinct_components @@ -395,12 +398,11 @@ def add_node(self, component: BaseComponent, name: str, inputs: List[str]): # E.g. for indexing pipelines it's common to add a retriever first and a document store afterwards. # The document store is already being used by the retriever however. # Thus the very same document store will be added twice, first as a subcomponent of the retriever and second as a first level node. - if name in self.components and self.components[name] != component: + if name in self.components.keys() and self.components[name] != component: raise PipelineConfigError(f"A node named '{name}' is already in the pipeline. Choose another name.") component_definitions = get_component_definitions(pipeline_config=self.get_config()) - component_definition = {"params": component.get_params(), "type": component.type} - component_definitions[name] = component_definition + component_definitions[name] = component._component_config # Name any nested component before adding them component.name = name