From e20f2e0d541805c3afb1f0948fa85f88b2a4f434 Mon Sep 17 00:00:00 2001
From: tstadel <60758086+tstadel@users.noreply.github.com>
Date: Wed, 23 Feb 2022 11:08:57 +0100
Subject: [PATCH] Generate code from pipeline (pipeline.to_code()) (#2214)
* pipeline.to_code() with jupyter support
* Update Documentation & Code Style
* add imports
* refactoring
* Update Documentation & Code Style
* docstrings added and refactoring
* Update Documentation & Code Style
* improve imports code generation
* add comment param
* Update Documentation & Code Style
* add simple test
* add to_notebook_cell()
* Update Documentation & Code Style
* introduce helper classes for code gen and eval report gen
* add more tests
* Update Documentation & Code Style
* fix Dict typings
* Update Documentation & Code Style
* validate user input before code gen
* enable urls for to_code()
* Update Documentation & Code Style
* remove all chars except colon from validation regex
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
---
docs/_src/api/api/pipelines.md | 38 +++
haystack/pipelines/base.py | 501 +++++++++++++++++++++++++--------
test/test_pipeline.py | 282 ++++++++++++++++++-
3 files changed, 694 insertions(+), 127 deletions(-)
diff --git a/docs/_src/api/api/pipelines.md b/docs/_src/api/api/pipelines.md
index 7d68b13ed9..0c1c0eab2b 100644
--- a/docs/_src/api/api/pipelines.md
+++ b/docs/_src/api/api/pipelines.md
@@ -37,6 +37,44 @@ Returns a configuration for the Pipeline that can be used with `BasePipeline.loa
- `return_defaults`: whether to output parameters that have the default values.
+
+
+#### to\_code
+
+```python
+def to_code(pipeline_variable_name: str = "pipeline", generate_imports: bool = True, add_comment: bool = False) -> str
+```
+
+Returns the code to create this pipeline as string.
+
+**Arguments**:
+
+- `pipeline_variable_name`: The variable name of the generated pipeline.
+Default value is 'pipeline'.
+- `generate_imports`: Whether to include the required import statements into the code.
+Default value is True.
+- `add_comment`: Whether to add a preceding comment that this code has been generated.
+Default value is False.
+
+
+
+#### to\_notebook\_cell
+
+```python
+def to_notebook_cell(pipeline_variable_name: str = "pipeline", generate_imports: bool = True, add_comment: bool = True)
+```
+
+Creates a new notebook cell with the code to create this pipeline.
+
+**Arguments**:
+
+- `pipeline_variable_name`: The variable name of the generated pipeline.
+Default value is 'pipeline'.
+- `generate_imports`: Whether to include the required import statements into the code.
+Default value is True.
+- `add_comment`: Whether to add a preceding comment that this code has been generated.
+Default value is True.
+
#### load\_from\_config
diff --git a/haystack/pipelines/base.py b/haystack/pipelines/base.py
index eee5ded9a4..c9321d3542 100644
--- a/haystack/pipelines/base.py
+++ b/haystack/pipelines/base.py
@@ -1,4 +1,5 @@
from __future__ import annotations
+import sys
from typing import Dict, List, Optional, Any
import copy
@@ -6,6 +7,7 @@
import inspect
import logging
import os
+import re
import traceback
import numpy as np
import pandas as pd
@@ -40,6 +42,11 @@
ROOT_NODE_TO_PIPELINE_NAME = {"query": "query", "file": "indexing"}
+CAMEL_CASE_TO_SNAKE_CASE_REGEX = re.compile(r"(?<=[a-z])(?=[A-Z0-9])")
+VALID_CODE_GEN_INPUT_REGEX = re.compile(r"^[-a-zA-Z0-9_/.:]+$")
+MODULE_NOT_FOUND = "MODULE_NOT_FOUND"
+CODE_GEN_ALLOWED_IMPORTS = ["haystack.document_stores", "haystack.nodes"]
+CODE_GEN_DEFAULT_COMMENT = "This code has been generated."
class RootNode(BaseComponent):
@@ -70,6 +77,54 @@ def get_config(self, return_defaults: bool = False) -> dict:
"""
raise NotImplementedError
+ def to_code(
+ self,
+ pipeline_variable_name: str = "pipeline",
+ generate_imports: bool = True,
+ add_comment: bool = False,
+ ) -> str:
+ """
+ Returns the code to create this pipeline as string.
+
+ :param pipeline_variable_name: The variable name of the generated pipeline.
+ Default value is 'pipeline'.
+ :param generate_imports: Whether to include the required import statements into the code.
+ Default value is True.
+ :param add_comment: Whether to add a preceding comment that this code has been generated.
+ Default value is False.
+ """
+ code = _PipelineCodeGen.generate_code(
+ pipeline=self,
+ pipeline_variable_name=pipeline_variable_name,
+ generate_imports=generate_imports,
+ comment=CODE_GEN_DEFAULT_COMMENT if add_comment else None,
+ )
+ return code
+
+ def to_notebook_cell(
+ self,
+ pipeline_variable_name: str = "pipeline",
+ generate_imports: bool = True,
+ add_comment: bool = True,
+ ):
+ """
+ Creates a new notebook cell with the code to create this pipeline.
+
+ :param pipeline_variable_name: The variable name of the generated pipeline.
+ Default value is 'pipeline'.
+ :param generate_imports: Whether to include the required import statements into the code.
+ Default value is True.
+ :param add_comment: Whether to add a preceding comment that this code has been generated.
+ Default value is True.
+ """
+ code = self.to_code(
+ pipeline_variable_name=pipeline_variable_name, generate_imports=generate_imports, add_comment=add_comment
+ )
+ try:
+ get_ipython().set_next_input(code) # type: ignore
+ except NameError:
+ logger.error("Could not create notebook cell. Make sure you're running in a notebook environment.")
+
@classmethod
def load_from_config(
cls, pipeline_config: Dict, pipeline_name: Optional[str] = None, overwrite_with_env_variables: bool = True
@@ -347,7 +402,9 @@ def save_to_deepset_cloud(
logger.info(f"Pipeline config '{pipeline_config_name}' successfully created.")
@classmethod
- def _get_pipeline_definition(cls, pipeline_config: Dict, pipeline_name: Optional[str] = None):
+ def _get_pipeline_definition(
+ cls, pipeline_config: Dict[str, Any], pipeline_name: Optional[str] = None
+ ) -> Dict[str, Any]:
"""
Get the definition of Pipeline from a given pipeline config. If the config contains more than one Pipeline,
then the pipeline_name must be supplied.
@@ -369,7 +426,9 @@ def _get_pipeline_definition(cls, pipeline_config: Dict, pipeline_name: Optional
return pipeline_definition
@classmethod
- def _get_component_definitions(cls, pipeline_config: Dict, overwrite_with_env_variables: bool):
+ def _get_component_definitions(
+ cls, pipeline_config: Dict[str, Any], overwrite_with_env_variables: bool
+ ) -> Dict[str, Any]:
"""
Returns the definitions of all components from a given pipeline config.
@@ -390,7 +449,7 @@ def _get_component_definitions(cls, pipeline_config: Dict, overwrite_with_env_va
return component_definitions
@classmethod
- def _overwrite_with_env_variables(cls, definition: dict):
+ def _overwrite_with_env_variables(cls, definition: Dict[str, Any]):
"""
Overwrite the pipeline config with environment variables. For example, to change index name param for an
ElasticsearchDocumentStore, an env variable 'MYDOCSTORE_PARAMS_INDEX=documents-2021' can be set. Note that an
@@ -1019,13 +1078,11 @@ def get_config(self, return_defaults: bool = False) -> dict:
:param return_defaults: whether to output parameters that have the default values.
"""
- nodes = self.graph.nodes
-
pipeline_name = ROOT_NODE_TO_PIPELINE_NAME[self.root_node.lower()]
pipelines: dict = {pipeline_name: {"name": pipeline_name, "type": self.__class__.__name__, "nodes": []}}
components = {}
- for node in nodes:
+ for node in self.graph.nodes:
if node == self.root_node:
continue
component_instance = self.graph.nodes.get(node)["component"]
@@ -1038,25 +1095,37 @@ def get_config(self, return_defaults: bool = False) -> dict:
for component_parent in component_parent_classes:
component_signature = {**component_signature, **inspect.signature(component_parent).parameters}
- for key, value in component_params.items():
+ for param_key, param_value in component_params.items():
# A parameter for a Component could be another Component. For instance, a Retriever has
# the DocumentStore as a parameter.
# Component configs must be a dict with a "type" key. The "type" keys distinguishes between
# other parameters like "custom_mapping" that are dicts.
# This currently only checks for the case single-level nesting case, wherein, "a Component has another
# Component as a parameter". For deeper nesting cases, this function should be made recursive.
- if isinstance(value, dict) and "type" in value.keys(): # the parameter is a Component
- components[node]["params"][key] = value["type"]
- sub_component_signature = inspect.signature(BaseComponent.subclasses[value["type"]]).parameters
- params = {
+ if isinstance(param_value, dict) and "type" in param_value.keys(): # the parameter is a Component
+ sub_component = param_value
+ sub_component_type_name = sub_component["type"]
+ sub_component_signature = inspect.signature(
+ BaseComponent.subclasses[sub_component_type_name]
+ ).parameters
+ sub_component_params = {
k: v
- for k, v in value["params"].items()
+ for k, v in sub_component["params"].items()
if sub_component_signature[k].default != v or return_defaults is True
}
- components[value["type"]] = {"name": value["type"], "type": value["type"], "params": params}
+
+ sub_component_name = self._generate_component_name(
+ type_name=sub_component_type_name, params=sub_component_params, existing_components=components
+ )
+ components[sub_component_name] = {
+ "name": sub_component_name,
+ "type": sub_component_type_name,
+ "params": sub_component_params,
+ }
+ components[node]["params"][param_key] = sub_component_name
else:
- if component_signature[key].default != value or return_defaults is True:
- components[node]["params"][key] = value
+ if component_signature[param_key].default != param_value or return_defaults is True:
+ components[node]["params"][param_key] = param_value
# create the Pipeline definition with how the Component are connected
pipelines[pipeline_name]["nodes"].append({"name": node, "inputs": list(self.graph.predecessors(node))})
@@ -1068,84 +1137,21 @@ def get_config(self, return_defaults: bool = False) -> dict:
}
return config
- def _format_document_answer(self, document_or_answer: dict):
- return "\n \t".join([f"{name}: {value}" for name, value in document_or_answer.items()])
-
- def _format_wrong_sample(self, query: dict):
- metrics = "\n \t".join([f"{name}: {value}" for name, value in query["metrics"].items()])
- documents = "\n\n \t".join([self._format_document_answer(doc) for doc in query.get("documents", [])])
- documents = f"Documents: \n \t{documents}\n" if len(documents) > 0 else ""
- answers = "\n\n \t".join([self._format_document_answer(answer) for answer in query.get("answers", [])])
- answers = f"Answers: \n \t{answers}\n" if len(answers) > 0 else ""
- gold_document_ids = "\n \t".join(query["gold_document_ids"])
- gold_answers = "\n \t".join(query.get("gold_answers", []))
- gold_answers = f"Gold Answers: \n \t{gold_answers}\n" if len(gold_answers) > 0 else ""
- s = (
- f"Query: \n \t{query['query']}\n"
- f"{gold_answers}"
- f"Gold Document Ids: \n \t{gold_document_ids}\n"
- f"Metrics: \n \t{metrics}\n"
- f"{answers}"
- f"{documents}"
- f"_______________________________________________________"
- )
- return s
-
- def _format_wrong_samples_node(self, node_name: str, wrong_samples_formatted: str):
- s = (
- f" Wrong {node_name} Examples\n"
- f"=======================================================\n"
- f"{wrong_samples_formatted}\n"
- f"=======================================================\n"
- )
- return s
-
- def _format_wrong_samples_report(self, eval_result: EvaluationResult, n_wrong_examples: int = 3):
- examples = {
- node: eval_result.wrong_examples(node, doc_relevance_col="gold_id_or_answer_match", n=n_wrong_examples)
- for node in eval_result.node_results.keys()
- }
- examples_formatted = {
- node: "\n".join([self._format_wrong_sample(example) for example in examples])
- for node, examples in examples.items()
- }
-
- return "\n".join(
- [self._format_wrong_samples_node(node, examples) for node, examples in examples_formatted.items()]
- )
-
- def _format_pipeline_node(self, node: str, calculated_metrics: dict):
- node_metrics: dict = {}
- for metric_mode in calculated_metrics:
- for metric, value in calculated_metrics[metric_mode].get(node, {}).items():
- node_metrics[f"{metric}{metric_mode}"] = value
-
- node_metrics_formatted = "\n".join(
- sorted([f" | {metric}: {value:5.3}" for metric, value in node_metrics.items()])
- )
- node_metrics_formatted = f"{node_metrics_formatted}\n" if len(node_metrics_formatted) > 0 else ""
- s = (
- f" {node}\n"
- f" |\n"
- f"{node_metrics_formatted}"
- f" |"
- )
- return s
-
- def _format_pipeline_overview(self, calculated_metrics: dict):
- pipeline_overview = "\n".join(
- [self._format_pipeline_node(node, calculated_metrics) for node in self.graph.nodes]
- )
- s = (
- f"================== Evaluation Report ==================\n"
- f"=======================================================\n"
- f" Pipeline Overview\n"
- f"=======================================================\n"
- f"{pipeline_overview}\n"
- f" Output\n"
- f"=======================================================\n"
- )
- return s
+ def _generate_component_name(
+ self,
+ type_name: str,
+ params: Dict[str, Any],
+ existing_components: Dict[str, Any],
+ ):
+ component_name: str = type_name
+ # add number if there are multiple distinct ones of the same type
+ while component_name in existing_components and params != existing_components[component_name]["params"]:
+ occupied_num = 1
+ if len(component_name) > len(type_name):
+ occupied_num = int(component_name[len(type_name) + 1 :])
+ new_num = occupied_num + 1
+ component_name = f"{type_name}_{new_num}"
+ return component_name
def print_eval_report(
self,
@@ -1160,36 +1166,10 @@ def print_eval_report(
:param n_wrong_examples: The number of worst queries to show.
:param metrics_filter: The metrics to show per node. If None all metrics will be shown.
"""
- if any(degree > 1 for node, degree in self.graph.out_degree):
- logger.warning("Pipelines with junctions are currently not supported.")
- return
-
- calculated_metrics = {
- "": eval_result.calculate_metrics(doc_relevance_col="gold_id_or_answer_match"),
- "_top_1": eval_result.calculate_metrics(
- doc_relevance_col="gold_id_or_answer_match", simulated_top_k_reader=1
- ),
- " upper bound": eval_result.calculate_metrics(
- doc_relevance_col="gold_id_or_answer_match", eval_mode="isolated"
- ),
- }
-
- if metrics_filter is not None:
- for metric_mode in calculated_metrics:
- calculated_metrics[metric_mode] = {
- node: metrics
- if node not in metrics_filter
- else {metric: value for metric, value in metrics.items() if metric in metrics_filter[node]}
- for node, metrics in calculated_metrics[metric_mode].items()
- }
-
- pipeline_overview = self._format_pipeline_overview(calculated_metrics)
- wrong_samples_report = self._format_wrong_samples_report(
- eval_result=eval_result, n_wrong_examples=n_wrong_examples
+ _PipelineEvalReportGen.print_eval_report(
+ eval_result=eval_result, pipeline=self, n_wrong_examples=n_wrong_examples, metrics_filter=metrics_filter
)
- print(f"{pipeline_overview}\n" f"{wrong_samples_report}")
-
class RayPipeline(Pipeline):
"""
@@ -1486,3 +1466,282 @@ def __call__(self, *args, **kwargs):
Ray calls this method which is then re-directed to the corresponding component's run().
"""
return self.node._dispatch_run(*args, **kwargs)
+
+
+class _PipelineCodeGen:
+ @classmethod
+ def _camel_to_snake_case(cls, input: str) -> str:
+ return CAMEL_CASE_TO_SNAKE_CASE_REGEX.sub("_", input).lower()
+
+ @classmethod
+ def _validate_user_input(cls, input: str):
+ if isinstance(input, str) and not VALID_CODE_GEN_INPUT_REGEX.match(input):
+ raise ValueError(f"'{input}' is not a valid code gen variable name. Use word characters only.")
+
+ @classmethod
+ def _validate_config(cls, pipeline_config: Dict[str, Any]):
+ for component in pipeline_config["components"]:
+ cls._validate_user_input(component["name"])
+ cls._validate_user_input(component["type"])
+ for k, v in component.get("params", {}).items():
+ cls._validate_user_input(k)
+ cls._validate_user_input(v)
+ for pipeline in pipeline_config["pipelines"]:
+ cls._validate_user_input(pipeline["name"])
+ cls._validate_user_input(pipeline["type"])
+ for node in pipeline["nodes"]:
+ cls._validate_user_input(node["name"])
+ for input in node["inputs"]:
+ cls._validate_user_input(input)
+
+ @classmethod
+ def generate_code(
+ cls,
+ pipeline: BasePipeline,
+ pipeline_variable_name: str = "pipeline",
+ generate_imports: bool = True,
+ comment: Optional[str] = None,
+ ) -> str:
+ pipeline_config = pipeline.get_config()
+ cls._validate_config(pipeline_config)
+
+ component_definitions = pipeline._get_component_definitions(
+ pipeline_config=pipeline_config, overwrite_with_env_variables=False
+ )
+ component_variable_names = {name: cls._camel_to_snake_case(name) for name in component_definitions.keys()}
+ pipeline_definition = pipeline._get_pipeline_definition(pipeline_config=pipeline_config)
+
+ code_parts = []
+ if generate_imports:
+ types_to_import = [component["type"] for component in component_definitions.values()]
+ imports_code = cls._generate_imports_code(types_to_import=types_to_import)
+ code_parts.append(imports_code)
+
+ components_code = cls._generate_components_code(
+ component_definitions=component_definitions, component_variable_names=component_variable_names
+ )
+ pipeline_code = cls._generate_pipeline_code(
+ pipeline_definition=pipeline_definition,
+ component_variable_names=component_variable_names,
+ pipeline_variable_name=pipeline_variable_name,
+ )
+
+ code_parts.append(components_code)
+ code_parts.append(pipeline_code)
+ code = "\n\n".join(code_parts)
+
+ if comment:
+ comment = re.sub(r"^(#\s)?", "# ", comment, flags=re.MULTILINE)
+ code = "\n".join([comment, code])
+
+ return code
+
+ @classmethod
+ def _generate_pipeline_code(
+ cls, pipeline_definition: Dict[str, Any], component_variable_names: Dict[str, str], pipeline_variable_name: str
+ ) -> str:
+ code_lines = [f"{pipeline_variable_name} = Pipeline()"]
+ for node in pipeline_definition["nodes"]:
+ node_name = node["name"]
+ component_variable_name = component_variable_names[node_name]
+ inputs = ", ".join(f'"{name}"' for name in node["inputs"])
+ code_lines.append(
+ f'{pipeline_variable_name}.add_node(component={component_variable_name}, name="{node_name}", inputs=[{inputs}])'
+ )
+
+ code = "\n".join(code_lines)
+ return code
+
+ @classmethod
+ def _generate_components_code(
+ cls, component_definitions: Dict[str, Any], component_variable_names: Dict[str, str]
+ ) -> str:
+ code = ""
+ declarations = {}
+ dependency_map = {}
+ for name, definition in component_definitions.items():
+ variable_name = component_variable_names[name]
+ class_name = definition["type"]
+ param_value_dict = {
+ key: component_variable_names.get(value, f'"{value}"') if type(value) == str else value
+ for key, value in definition["params"].items()
+ }
+ init_args = ", ".join(f"{key}={value}" for key, value in param_value_dict.items())
+ declarations[name] = f"{variable_name} = {class_name}({init_args})"
+ dependency_map[name] = [
+ param_value for param_value in definition["params"].values() if param_value in component_variable_names
+ ]
+
+ ordered_components = cls._order_components(dependency_map=dependency_map)
+ ordered_declarations = [declarations[component] for component in ordered_components]
+ code = "\n".join(ordered_declarations)
+ return code
+
+ @classmethod
+ def _generate_imports_code(cls, types_to_import: List[str]) -> str:
+ code_lines = []
+ importable_classes = {
+ name: mod
+ for mod in CODE_GEN_ALLOWED_IMPORTS
+ for name, obj in inspect.getmembers(sys.modules[mod])
+ if inspect.isclass(obj)
+ }
+
+ imports_by_module: Dict[str, List[str]] = {}
+ for t in types_to_import:
+ mod = importable_classes.get(t, MODULE_NOT_FOUND)
+ if mod in imports_by_module:
+ imports_by_module[mod].append(t)
+ else:
+ imports_by_module[mod] = [t]
+
+ for mod in sorted(imports_by_module.keys()):
+ sorted_types = sorted(set(imports_by_module[mod]))
+ import_types = ", ".join(sorted_types)
+ line_prefix = "# " if mod == MODULE_NOT_FOUND else ""
+ code_lines.append(f"{line_prefix}from {mod} import {import_types}")
+
+ code = "\n".join(code_lines)
+ return code
+
+ @classmethod
+ def _order_components(
+ cls, dependency_map: Dict[str, List[str]], components_to_order: Optional[List[str]] = None
+ ) -> List[str]:
+ ordered_components = []
+ if components_to_order is None:
+ components_to_order = list(dependency_map.keys())
+ for component in components_to_order:
+ dependencies = dependency_map[component]
+ ordered_dependencies = cls._order_components(
+ dependency_map=dependency_map, components_to_order=dependencies
+ )
+ ordered_components += [d for d in ordered_dependencies if d not in ordered_components]
+ if component not in ordered_components:
+ ordered_components.append(component)
+ return ordered_components
+
+
+class _PipelineEvalReportGen:
+ @classmethod
+ def print_eval_report(
+ cls,
+ eval_result: EvaluationResult,
+ pipeline: Pipeline,
+ n_wrong_examples: int = 3,
+ metrics_filter: Optional[Dict[str, List[str]]] = None,
+ ):
+ if any(degree > 1 for node, degree in pipeline.graph.out_degree):
+ logger.warning("Pipelines with junctions are currently not supported.")
+ return
+
+ calculated_metrics = {
+ "": eval_result.calculate_metrics(doc_relevance_col="gold_id_or_answer_match"),
+ "_top_1": eval_result.calculate_metrics(
+ doc_relevance_col="gold_id_or_answer_match", simulated_top_k_reader=1
+ ),
+ " upper bound": eval_result.calculate_metrics(
+ doc_relevance_col="gold_id_or_answer_match", eval_mode="isolated"
+ ),
+ }
+
+ if metrics_filter is not None:
+ for metric_mode in calculated_metrics:
+ calculated_metrics[metric_mode] = {
+ node: metrics
+ if node not in metrics_filter
+ else {metric: value for metric, value in metrics.items() if metric in metrics_filter[node]}
+ for node, metrics in calculated_metrics[metric_mode].items()
+ }
+
+ pipeline_overview = cls._format_pipeline_overview(calculated_metrics=calculated_metrics, pipeline=pipeline)
+ wrong_samples_report = cls._format_wrong_samples_report(
+ eval_result=eval_result, n_wrong_examples=n_wrong_examples
+ )
+
+ print(f"{pipeline_overview}\n" f"{wrong_samples_report}")
+
+ @classmethod
+ def _format_document_answer(cls, document_or_answer: dict):
+ return "\n \t".join([f"{name}: {value}" for name, value in document_or_answer.items()])
+
+ @classmethod
+ def _format_wrong_sample(cls, query: dict):
+ metrics = "\n \t".join([f"{name}: {value}" for name, value in query["metrics"].items()])
+ documents = "\n\n \t".join([cls._format_document_answer(doc) for doc in query.get("documents", [])])
+ documents = f"Documents: \n \t{documents}\n" if len(documents) > 0 else ""
+ answers = "\n\n \t".join([cls._format_document_answer(answer) for answer in query.get("answers", [])])
+ answers = f"Answers: \n \t{answers}\n" if len(answers) > 0 else ""
+ gold_document_ids = "\n \t".join(query["gold_document_ids"])
+ gold_answers = "\n \t".join(query.get("gold_answers", []))
+ gold_answers = f"Gold Answers: \n \t{gold_answers}\n" if len(gold_answers) > 0 else ""
+ s = (
+ f"Query: \n \t{query['query']}\n"
+ f"{gold_answers}"
+ f"Gold Document Ids: \n \t{gold_document_ids}\n"
+ f"Metrics: \n \t{metrics}\n"
+ f"{answers}"
+ f"{documents}"
+ f"_______________________________________________________"
+ )
+ return s
+
+ @classmethod
+ def _format_wrong_samples_node(cls, node_name: str, wrong_samples_formatted: str):
+ s = (
+ f" Wrong {node_name} Examples\n"
+ f"=======================================================\n"
+ f"{wrong_samples_formatted}\n"
+ f"=======================================================\n"
+ )
+ return s
+
+ @classmethod
+ def _format_wrong_samples_report(cls, eval_result: EvaluationResult, n_wrong_examples: int = 3):
+ examples = {
+ node: eval_result.wrong_examples(node, doc_relevance_col="gold_id_or_answer_match", n=n_wrong_examples)
+ for node in eval_result.node_results.keys()
+ }
+ examples_formatted = {
+ node: "\n".join([cls._format_wrong_sample(example) for example in examples])
+ for node, examples in examples.items()
+ }
+
+ return "\n".join(
+ [cls._format_wrong_samples_node(node, examples) for node, examples in examples_formatted.items()]
+ )
+
+ @classmethod
+ def _format_pipeline_node(cls, node: str, calculated_metrics: dict):
+ node_metrics: dict = {}
+ for metric_mode in calculated_metrics:
+ for metric, value in calculated_metrics[metric_mode].get(node, {}).items():
+ node_metrics[f"{metric}{metric_mode}"] = value
+
+ node_metrics_formatted = "\n".join(
+ sorted([f" | {metric}: {value:5.3}" for metric, value in node_metrics.items()])
+ )
+ node_metrics_formatted = f"{node_metrics_formatted}\n" if len(node_metrics_formatted) > 0 else ""
+ s = (
+ f" {node}\n"
+ f" |\n"
+ f"{node_metrics_formatted}"
+ f" |"
+ )
+ return s
+
+ @classmethod
+ def _format_pipeline_overview(cls, calculated_metrics: dict, pipeline: Pipeline):
+ pipeline_overview = "\n".join(
+ [cls._format_pipeline_node(node, calculated_metrics) for node in pipeline.graph.nodes]
+ )
+ s = (
+ f"================== Evaluation Report ==================\n"
+ f"=======================================================\n"
+ f" Pipeline Overview\n"
+ f"=======================================================\n"
+ f"{pipeline_overview}\n"
+ f" Output\n"
+ f"=======================================================\n"
+ )
+ return s
diff --git a/test/test_pipeline.py b/test/test_pipeline.py
index 73bfb05864..0b4387e3cd 100644
--- a/test/test_pipeline.py
+++ b/test/test_pipeline.py
@@ -10,15 +10,13 @@
from haystack.document_stores.base import BaseDocumentStore
from haystack.document_stores.deepsetcloud import DeepsetCloudDocumentStore
from haystack.document_stores.elasticsearch import ElasticsearchDocumentStore
+from haystack.document_stores.memory import InMemoryDocumentStore
+from haystack.nodes.other.join_docs import JoinDocuments
from haystack.nodes.base import BaseComponent
from haystack.nodes.retriever.base import BaseRetriever
from haystack.nodes.retriever.sparse import ElasticsearchRetriever
-from haystack.pipelines import (
- Pipeline,
- DocumentSearchPipeline,
- RootNode,
-)
-from haystack.pipelines import ExtractiveQAPipeline
+from haystack.pipelines import Pipeline, DocumentSearchPipeline, RootNode, ExtractiveQAPipeline
+from haystack.pipelines.base import _PipelineCodeGen
from haystack.nodes import DensePassageRetriever, EmbeddingRetriever
from conftest import MOCK_DC, DC_API_ENDPOINT, DC_API_KEY, DC_TEST_INDEX, SAMPLES_PATH, deepset_cloud_fixture
@@ -166,6 +164,278 @@ def test_load_tfidfretriever_yaml(tmp_path):
assert prediction["answers"][0].answer == "haystack"
+@pytest.mark.elasticsearch
+def test_to_code():
+ index_pipeline = Pipeline.load_from_yaml(
+ SAMPLES_PATH / "pipeline" / "test_pipeline.yaml", pipeline_name="indexing_pipeline"
+ )
+ query_pipeline = Pipeline.load_from_yaml(
+ SAMPLES_PATH / "pipeline" / "test_pipeline.yaml", pipeline_name="query_pipeline"
+ )
+ query_pipeline_code = query_pipeline.to_code(pipeline_variable_name="query_pipeline_from_code")
+ index_pipeline_code = index_pipeline.to_code(pipeline_variable_name="index_pipeline_from_code")
+ exec(query_pipeline_code)
+ exec(index_pipeline_code)
+ assert locals()["query_pipeline_from_code"] is not None
+ assert locals()["index_pipeline_from_code"] is not None
+ assert query_pipeline.get_config() == locals()["query_pipeline_from_code"].get_config()
+ assert index_pipeline.get_config() == locals()["index_pipeline_from_code"].get_config()
+
+
+@pytest.mark.elasticsearch
+def test_PipelineCodeGen_simple_sparse_pipeline():
+ doc_store = ElasticsearchDocumentStore(index="my-index")
+ retriever = ElasticsearchRetriever(document_store=doc_store, top_k=20)
+ pipeline = Pipeline()
+ pipeline.add_node(component=retriever, name="retri", inputs=["Query"])
+
+ code = _PipelineCodeGen.generate_code(pipeline=pipeline, pipeline_variable_name="p", generate_imports=False)
+ assert code == (
+ 'elasticsearch_document_store = ElasticsearchDocumentStore(index="my-index")\n'
+ "retri = ElasticsearchRetriever(document_store=elasticsearch_document_store, top_k=20)\n"
+ "\n"
+ "p = Pipeline()\n"
+ 'p.add_node(component=retri, name="retri", inputs=["Query"])'
+ )
+
+
+@pytest.mark.elasticsearch
+def test_PipelineCodeGen_dual_retriever_pipeline():
+ es_doc_store = ElasticsearchDocumentStore(index="my-index")
+ es_retriever = ElasticsearchRetriever(document_store=es_doc_store, top_k=20)
+ dense_doc_store = InMemoryDocumentStore(index="my-index")
+ emb_retriever = EmbeddingRetriever(
+ document_store=dense_doc_store, embedding_model="sentence-transformers/all-MiniLM-L6-v2"
+ )
+ p_ensemble = Pipeline()
+ p_ensemble.add_node(component=es_retriever, name="EsRetriever", inputs=["Query"])
+ p_ensemble.add_node(component=emb_retriever, name="EmbeddingRetriever", inputs=["Query"])
+ p_ensemble.add_node(
+ component=JoinDocuments(join_mode="merge"), name="JoinResults", inputs=["EsRetriever", "EmbeddingRetriever"]
+ )
+
+ code = _PipelineCodeGen.generate_code(pipeline=p_ensemble, pipeline_variable_name="p", generate_imports=False)
+ assert code == (
+ 'elasticsearch_document_store = ElasticsearchDocumentStore(index="my-index")\n'
+ "es_retriever = ElasticsearchRetriever(document_store=elasticsearch_document_store, top_k=20)\n"
+ 'in_memory_document_store = InMemoryDocumentStore(index="my-index")\n'
+ 'embedding_retriever = EmbeddingRetriever(document_store=in_memory_document_store, embedding_model="sentence-transformers/all-MiniLM-L6-v2")\n'
+ 'join_results = JoinDocuments(join_mode="merge")\n'
+ "\n"
+ "p = Pipeline()\n"
+ 'p.add_node(component=es_retriever, name="EsRetriever", inputs=["Query"])\n'
+ 'p.add_node(component=embedding_retriever, name="EmbeddingRetriever", inputs=["Query"])\n'
+ 'p.add_node(component=join_results, name="JoinResults", inputs=["EsRetriever", "EmbeddingRetriever"])'
+ )
+
+
+@pytest.mark.elasticsearch
+def test_PipelineCodeGen_dual_retriever_pipeline_same_docstore():
+ es_doc_store = ElasticsearchDocumentStore(index="my-index")
+ es_retriever = ElasticsearchRetriever(document_store=es_doc_store, top_k=20)
+ emb_retriever = EmbeddingRetriever(
+ document_store=es_doc_store, embedding_model="sentence-transformers/all-MiniLM-L6-v2"
+ )
+ p_ensemble = Pipeline()
+ p_ensemble.add_node(component=es_retriever, name="EsRetriever", inputs=["Query"])
+ p_ensemble.add_node(component=emb_retriever, name="EmbeddingRetriever", inputs=["Query"])
+ p_ensemble.add_node(
+ component=JoinDocuments(join_mode="merge"), name="JoinResults", inputs=["EsRetriever", "EmbeddingRetriever"]
+ )
+
+ code = _PipelineCodeGen.generate_code(pipeline=p_ensemble, pipeline_variable_name="p", generate_imports=False)
+ assert code == (
+ 'elasticsearch_document_store = ElasticsearchDocumentStore(index="my-index")\n'
+ "es_retriever = ElasticsearchRetriever(document_store=elasticsearch_document_store, top_k=20)\n"
+ 'embedding_retriever = EmbeddingRetriever(document_store=elasticsearch_document_store, embedding_model="sentence-transformers/all-MiniLM-L6-v2")\n'
+ 'join_results = JoinDocuments(join_mode="merge")\n'
+ "\n"
+ "p = Pipeline()\n"
+ 'p.add_node(component=es_retriever, name="EsRetriever", inputs=["Query"])\n'
+ 'p.add_node(component=embedding_retriever, name="EmbeddingRetriever", inputs=["Query"])\n'
+ 'p.add_node(component=join_results, name="JoinResults", inputs=["EsRetriever", "EmbeddingRetriever"])'
+ )
+
+
+@pytest.mark.elasticsearch
+def test_PipelineCodeGen_dual_retriever_pipeline_different_docstore():
+ es_doc_store_a = ElasticsearchDocumentStore(index="my-index-a")
+ es_doc_store_b = ElasticsearchDocumentStore(index="my-index-b")
+ es_retriever = ElasticsearchRetriever(document_store=es_doc_store_a, top_k=20)
+ emb_retriever = EmbeddingRetriever(
+ document_store=es_doc_store_b, embedding_model="sentence-transformers/all-MiniLM-L6-v2"
+ )
+ p_ensemble = Pipeline()
+ p_ensemble.add_node(component=es_retriever, name="EsRetriever", inputs=["Query"])
+ p_ensemble.add_node(component=emb_retriever, name="EmbeddingRetriever", inputs=["Query"])
+ p_ensemble.add_node(
+ component=JoinDocuments(join_mode="merge"), name="JoinResults", inputs=["EsRetriever", "EmbeddingRetriever"]
+ )
+
+ code = _PipelineCodeGen.generate_code(pipeline=p_ensemble, pipeline_variable_name="p", generate_imports=False)
+ assert code == (
+ 'elasticsearch_document_store = ElasticsearchDocumentStore(index="my-index-a")\n'
+ "es_retriever = ElasticsearchRetriever(document_store=elasticsearch_document_store, top_k=20)\n"
+ 'elasticsearch_document_store_2 = ElasticsearchDocumentStore(index="my-index-b")\n'
+ 'embedding_retriever = EmbeddingRetriever(document_store=elasticsearch_document_store_2, embedding_model="sentence-transformers/all-MiniLM-L6-v2")\n'
+ 'join_results = JoinDocuments(join_mode="merge")\n'
+ "\n"
+ "p = Pipeline()\n"
+ 'p.add_node(component=es_retriever, name="EsRetriever", inputs=["Query"])\n'
+ 'p.add_node(component=embedding_retriever, name="EmbeddingRetriever", inputs=["Query"])\n'
+ 'p.add_node(component=join_results, name="JoinResults", inputs=["EsRetriever", "EmbeddingRetriever"])'
+ )
+
+
+@pytest.mark.elasticsearch
+def test_PipelineCodeGen_dual_retriever_pipeline_same_type():
+ es_doc_store = ElasticsearchDocumentStore(index="my-index")
+ es_retriever_1 = ElasticsearchRetriever(document_store=es_doc_store, top_k=20)
+ es_retriever_2 = ElasticsearchRetriever(document_store=es_doc_store, top_k=10)
+ p_ensemble = Pipeline()
+ p_ensemble.add_node(component=es_retriever_1, name="EsRetriever1", inputs=["Query"])
+ p_ensemble.add_node(component=es_retriever_2, name="EsRetriever2", inputs=["Query"])
+ p_ensemble.add_node(
+ component=JoinDocuments(join_mode="merge"), name="JoinResults", inputs=["EsRetriever1", "EsRetriever2"]
+ )
+
+ code = _PipelineCodeGen.generate_code(pipeline=p_ensemble, pipeline_variable_name="p", generate_imports=False)
+ assert code == (
+ 'elasticsearch_document_store = ElasticsearchDocumentStore(index="my-index")\n'
+ "es_retriever_1 = ElasticsearchRetriever(document_store=elasticsearch_document_store, top_k=20)\n"
+ "es_retriever_2 = ElasticsearchRetriever(document_store=elasticsearch_document_store)\n"
+ 'join_results = JoinDocuments(join_mode="merge")\n'
+ "\n"
+ "p = Pipeline()\n"
+ 'p.add_node(component=es_retriever_1, name="EsRetriever1", inputs=["Query"])\n'
+ 'p.add_node(component=es_retriever_2, name="EsRetriever2", inputs=["Query"])\n'
+ 'p.add_node(component=join_results, name="JoinResults", inputs=["EsRetriever1", "EsRetriever2"])'
+ )
+
+
+@pytest.mark.elasticsearch
+def test_PipelineCodeGen_imports():
+ doc_store = ElasticsearchDocumentStore(index="my-index")
+ retriever = ElasticsearchRetriever(document_store=doc_store, top_k=20)
+ pipeline = Pipeline()
+ pipeline.add_node(component=retriever, name="retri", inputs=["Query"])
+
+ code = _PipelineCodeGen.generate_code(pipeline=pipeline, pipeline_variable_name="p", generate_imports=True)
+ assert code == (
+ "from haystack.document_stores import ElasticsearchDocumentStore\n"
+ "from haystack.nodes import ElasticsearchRetriever\n"
+ "\n"
+ 'elasticsearch_document_store = ElasticsearchDocumentStore(index="my-index")\n'
+ "retri = ElasticsearchRetriever(document_store=elasticsearch_document_store, top_k=20)\n"
+ "\n"
+ "p = Pipeline()\n"
+ 'p.add_node(component=retri, name="retri", inputs=["Query"])'
+ )
+
+
+def test_PipelineCodeGen_order_components():
+ dependency_map = {"a": ["aa", "ab"], "aa": [], "ab": ["aba"], "aba": [], "b": ["a", "c"], "c": ["a"]}
+ ordered = _PipelineCodeGen._order_components(dependency_map=dependency_map)
+ assert ordered == ["aa", "aba", "ab", "a", "c", "b"]
+
+
+@pytest.mark.parametrize("input", ["\btest", " test", "#test", "+test", "\ttest", "\ntest", "test()"])
+def test_PipelineCodeGen_validate_user_input_invalid(input):
+ with pytest.raises(ValueError):
+ _PipelineCodeGen._validate_user_input(input)
+
+
+@pytest.mark.parametrize(
+ "input", ["test", "testName", "test_name", "test-name", "test-name1234", "http://localhost:8000/my-path"]
+)
+def test_PipelineCodeGen_validate_user_input_valid(input):
+ _PipelineCodeGen._validate_user_input(input)
+
+
+def test_PipelineCodeGen_validate_pipeline_config_invalid_component_name():
+ with pytest.raises(ValueError):
+ _PipelineCodeGen._validate_config({"components": [{"name": "\btest"}]})
+
+
+def test_PipelineCodeGen_validate_pipeline_config_invalid_component_type():
+ with pytest.raises(ValueError):
+ _PipelineCodeGen._validate_config({"components": [{"name": "test", "type": "\btest"}]})
+
+
+def test_PipelineCodeGen_validate_pipeline_config_invalid_component_param():
+ with pytest.raises(ValueError):
+ _PipelineCodeGen._validate_config(
+ {"components": [{"name": "test", "type": "test", "params": {"key": "\btest"}}]}
+ )
+
+
+def test_PipelineCodeGen_validate_pipeline_config_invalid_component_param_key():
+ with pytest.raises(ValueError):
+ _PipelineCodeGen._validate_config(
+ {"components": [{"name": "test", "type": "test", "params": {"\btest": "test"}}]}
+ )
+
+
+def test_PipelineCodeGen_validate_pipeline_config_invalid_pipeline_name():
+ with pytest.raises(ValueError):
+ _PipelineCodeGen._validate_config(
+ {
+ "components": [
+ {
+ "name": "test",
+ "type": "test",
+ }
+ ],
+ "pipelines": [{"name": "\btest"}],
+ }
+ )
+
+
+def test_PipelineCodeGen_validate_pipeline_config_invalid_pipeline_type():
+ with pytest.raises(ValueError):
+ _PipelineCodeGen._validate_config(
+ {
+ "components": [
+ {
+ "name": "test",
+ "type": "test",
+ }
+ ],
+ "pipelines": [{"name": "test", "type": "\btest"}],
+ }
+ )
+
+
+def test_PipelineCodeGen_validate_pipeline_config_invalid_pipeline_node_name():
+ with pytest.raises(ValueError):
+ _PipelineCodeGen._validate_config(
+ {
+ "components": [
+ {
+ "name": "test",
+ "type": "test",
+ }
+ ],
+ "pipelines": [{"name": "test", "type": "test", "nodes": [{"name": "\btest"}]}],
+ }
+ )
+
+
+def test_PipelineCodeGen_validate_pipeline_config_invalid_pipeline_node_inputs():
+ with pytest.raises(ValueError):
+ _PipelineCodeGen._validate_config(
+ {
+ "components": [
+ {
+ "name": "test",
+ "type": "test",
+ }
+ ],
+ "pipelines": [{"name": "test", "type": "test", "nodes": [{"name": "test", "inputs": ["\btest"]}]}],
+ }
+ )
+
+
@pytest.mark.usefixtures(deepset_cloud_fixture.__name__)
@responses.activate
def test_load_from_deepset_cloud_query():