From 98f62a054c336c56ecfd39788e4348ff8da885c3 Mon Sep 17 00:00:00 2001 From: Michele Pangrazzi Date: Fri, 17 Jan 2025 10:25:49 +0100 Subject: [PATCH 01/23] Moved YAML-deployment test files in test_files/yaml folder --- tests/test_files/{ => yaml}/broken_rag_pipeline.yml | 0 .../{ => yaml}/working_pipelines/basic_rag_pipeline.yml | 0 .../{ => yaml}/working_pipelines/chat_with_website.yml | 0 .../{ => yaml}/working_pipelines/minimal_retriever.yml | 0 .../{ => yaml}/working_pipelines/pipeline_qdrant.yml | 0 .../{ => yaml}/working_pipelines/pipeline_qdrant_2.yml | 0 .../test_files/{ => yaml}/working_pipelines/st_retriever.yml | 0 .../{ => yaml}/working_pipelines/test_pipeline_01.yml | 0 .../{ => yaml}/working_pipelines/test_pipeline_02.yml | 0 tests/test_it_deploy.py | 4 ++-- tests/test_it_draw.py | 2 +- tests/test_it_handling_deploy_exceptions.py | 2 +- tests/test_it_status.py | 2 +- tests/test_registry.py | 2 +- 14 files changed, 6 insertions(+), 6 deletions(-) rename tests/test_files/{ => yaml}/broken_rag_pipeline.yml (100%) rename tests/test_files/{ => yaml}/working_pipelines/basic_rag_pipeline.yml (100%) rename tests/test_files/{ => yaml}/working_pipelines/chat_with_website.yml (100%) rename tests/test_files/{ => yaml}/working_pipelines/minimal_retriever.yml (100%) rename tests/test_files/{ => yaml}/working_pipelines/pipeline_qdrant.yml (100%) rename tests/test_files/{ => yaml}/working_pipelines/pipeline_qdrant_2.yml (100%) rename tests/test_files/{ => yaml}/working_pipelines/st_retriever.yml (100%) rename tests/test_files/{ => yaml}/working_pipelines/test_pipeline_01.yml (100%) rename tests/test_files/{ => yaml}/working_pipelines/test_pipeline_02.yml (100%) diff --git a/tests/test_files/broken_rag_pipeline.yml b/tests/test_files/yaml/broken_rag_pipeline.yml similarity index 100% rename from tests/test_files/broken_rag_pipeline.yml rename to tests/test_files/yaml/broken_rag_pipeline.yml diff --git a/tests/test_files/working_pipelines/basic_rag_pipeline.yml b/tests/test_files/yaml/working_pipelines/basic_rag_pipeline.yml similarity index 100% rename from tests/test_files/working_pipelines/basic_rag_pipeline.yml rename to tests/test_files/yaml/working_pipelines/basic_rag_pipeline.yml diff --git a/tests/test_files/working_pipelines/chat_with_website.yml b/tests/test_files/yaml/working_pipelines/chat_with_website.yml similarity index 100% rename from tests/test_files/working_pipelines/chat_with_website.yml rename to tests/test_files/yaml/working_pipelines/chat_with_website.yml diff --git a/tests/test_files/working_pipelines/minimal_retriever.yml b/tests/test_files/yaml/working_pipelines/minimal_retriever.yml similarity index 100% rename from tests/test_files/working_pipelines/minimal_retriever.yml rename to tests/test_files/yaml/working_pipelines/minimal_retriever.yml diff --git a/tests/test_files/working_pipelines/pipeline_qdrant.yml b/tests/test_files/yaml/working_pipelines/pipeline_qdrant.yml similarity index 100% rename from tests/test_files/working_pipelines/pipeline_qdrant.yml rename to tests/test_files/yaml/working_pipelines/pipeline_qdrant.yml diff --git a/tests/test_files/working_pipelines/pipeline_qdrant_2.yml b/tests/test_files/yaml/working_pipelines/pipeline_qdrant_2.yml similarity index 100% rename from tests/test_files/working_pipelines/pipeline_qdrant_2.yml rename to tests/test_files/yaml/working_pipelines/pipeline_qdrant_2.yml diff --git a/tests/test_files/working_pipelines/st_retriever.yml b/tests/test_files/yaml/working_pipelines/st_retriever.yml similarity index 100% rename from tests/test_files/working_pipelines/st_retriever.yml rename to tests/test_files/yaml/working_pipelines/st_retriever.yml diff --git a/tests/test_files/working_pipelines/test_pipeline_01.yml b/tests/test_files/yaml/working_pipelines/test_pipeline_01.yml similarity index 100% rename from tests/test_files/working_pipelines/test_pipeline_01.yml rename to tests/test_files/yaml/working_pipelines/test_pipeline_01.yml diff --git a/tests/test_files/working_pipelines/test_pipeline_02.yml b/tests/test_files/yaml/working_pipelines/test_pipeline_02.yml similarity index 100% rename from tests/test_files/working_pipelines/test_pipeline_02.yml rename to tests/test_files/yaml/working_pipelines/test_pipeline_02.yml diff --git a/tests/test_it_deploy.py b/tests/test_it_deploy.py index d980c22..cd5fffb 100644 --- a/tests/test_it_deploy.py +++ b/tests/test_it_deploy.py @@ -13,7 +13,7 @@ def clear_registry(): # Load pipeline definitions from test_files -test_files = Path(__file__).parent / "test_files" / "working_pipelines" +test_files = Path(__file__).parent / "test_files/yaml" / "working_pipelines" pipeline_data = [{"name": file.stem, "source_code": file.read_text()} for file in test_files.glob("*.yml")] @@ -30,7 +30,7 @@ def test_deploy_pipeline_def(deploy_pipeline, status_pipeline, pipeline_data: di def test_undeploy_pipeline_def(deploy_pipeline, undeploy_pipeline, status_pipeline): - pipeline_file = Path(__file__).parent / "test_files" / "working_pipelines/test_pipeline_01.yml" + pipeline_file = Path(__file__).parent / "test_files/yaml" / "working_pipelines/test_pipeline_01.yml" pipeline_data = {"name": pipeline_file.stem, "source_code": pipeline_file.read_text()} deploy_response = deploy_pipeline(client, pipeline_data["name"], pipeline_data["source_code"]) diff --git a/tests/test_it_draw.py b/tests/test_it_draw.py index fb71776..acc08f0 100644 --- a/tests/test_it_draw.py +++ b/tests/test_it_draw.py @@ -6,7 +6,7 @@ def test_draw_pipeline(deploy_pipeline, draw_pipeline): - pipeline_file = Path(__file__).parent / "test_files" / "working_pipelines/test_pipeline_01.yml" + pipeline_file = Path(__file__).parent / "test_files/yaml" / "working_pipelines/test_pipeline_01.yml" pipeline_data = {"name": pipeline_file.stem, "source_code": pipeline_file.read_text()} deploy_pipeline(client, pipeline_data["name"], pipeline_data["source_code"]) diff --git a/tests/test_it_handling_deploy_exceptions.py b/tests/test_it_handling_deploy_exceptions.py index 46b2766..79fc66b 100644 --- a/tests/test_it_handling_deploy_exceptions.py +++ b/tests/test_it_handling_deploy_exceptions.py @@ -7,7 +7,7 @@ def test_gracefully_handle_deploy_exception(deploy_pipeline): pipeline_name = "broken_rag_pipeline" - pipeline_def = (Path(__file__).parent / "test_files" / "broken_rag_pipeline.yml").read_text() + pipeline_def = (Path(__file__).parent / "test_files/yaml" / "broken_rag_pipeline.yml").read_text() deploy_response = deploy_pipeline(client, pipeline_name, pipeline_def) assert deploy_response.status_code == 500 diff --git a/tests/test_it_status.py b/tests/test_it_status.py index e34eda1..1b8220c 100644 --- a/tests/test_it_status.py +++ b/tests/test_it_status.py @@ -18,7 +18,7 @@ def test_status_all_pipelines(status_pipeline): def test_status_single_pipeline(deploy_pipeline, status_pipeline): - pipeline_file = Path(__file__).parent / "test_files" / "working_pipelines/test_pipeline_01.yml" + pipeline_file = Path(__file__).parent / "test_files/yaml" / "working_pipelines/test_pipeline_01.yml" pipeline_data = {"name": pipeline_file.stem, "source_code": pipeline_file.read_text()} deploy_response = deploy_pipeline(client, pipeline_data["name"], pipeline_data["source_code"]) diff --git a/tests/test_registry.py b/tests/test_registry.py index 6468cf2..a568741 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -13,7 +13,7 @@ def pipeline_registry(): @pytest.fixture def sample_pipeline_yaml(): - return (Path(__file__).parent / "test_files" / "working_pipelines" / "basic_rag_pipeline.yml").read_text() + return (Path(__file__).parent / "test_files/yaml" / "working_pipelines" / "basic_rag_pipeline.yml").read_text() def test_add_pipeline(pipeline_registry, sample_pipeline_yaml): From b59906158362af2c49b021a03a6453d08c760db8 Mon Sep 17 00:00:00 2001 From: Michele Pangrazzi Date: Mon, 20 Jan 2025 17:04:39 +0100 Subject: [PATCH 02/23] Add save_pipeline_files and load_pipeline_module deploy utils --- src/hayhooks/server/exceptions.py | 3 + .../server/utils/base_pipeline_wrapper.py | 43 ++++++++++ src/hayhooks/server/utils/deploy_utils.py | 86 ++++++++++++++++++- tests/test_deploy_utils.py | 78 +++++++++++++++++ .../chat_with_website/chat_with_website.yml | 50 +++++++++++ .../chat_with_website/pipeline_wrapper.py | 23 +++++ .../python/no_wrapper/chat_with_website.yml | 50 +++++++++++ 7 files changed, 332 insertions(+), 1 deletion(-) create mode 100644 src/hayhooks/server/exceptions.py create mode 100644 src/hayhooks/server/utils/base_pipeline_wrapper.py create mode 100644 tests/test_deploy_utils.py create mode 100644 tests/test_files/python/chat_with_website/chat_with_website.yml create mode 100644 tests/test_files/python/chat_with_website/pipeline_wrapper.py create mode 100644 tests/test_files/python/no_wrapper/chat_with_website.yml diff --git a/src/hayhooks/server/exceptions.py b/src/hayhooks/server/exceptions.py new file mode 100644 index 0000000..5dc800f --- /dev/null +++ b/src/hayhooks/server/exceptions.py @@ -0,0 +1,3 @@ +class PipelineFilesError(Exception): + """Exception for errors saving pipeline files.""" + pass diff --git a/src/hayhooks/server/utils/base_pipeline_wrapper.py b/src/hayhooks/server/utils/base_pipeline_wrapper.py new file mode 100644 index 0000000..5f5a3f2 --- /dev/null +++ b/src/hayhooks/server/utils/base_pipeline_wrapper.py @@ -0,0 +1,43 @@ +from abc import ABC, abstractmethod +from typing import List + + +class BasePipelineWrapper(ABC): + @abstractmethod + def setup(self) -> None: + """ + Setup the pipeline. + + This method should be called before using the pipeline. + """ + pass + + @abstractmethod + def run_api(self, urls: List[str], question: str) -> dict: + """ + Run the pipeline in API mode. + + Args: + urls: List of URLs to fetch content from + question: Question to be answered + + Returns: + dict: Pipeline execution results + """ + pass + + @abstractmethod + def run_chat(self, user_message: str, model_id: str, messages: List[dict], body: dict) -> dict: + """ + Run the pipeline in chat mode. + + Args: + user_message: Message from the user + model_id: ID of the model to use + messages: List of previous messages + body: Additional request body parameters + + Returns: + dict: Pipeline execution results + """ + pass diff --git a/src/hayhooks/server/utils/deploy_utils.py b/src/hayhooks/server/utils/deploy_utils.py index 44ac36f..2545af1 100644 --- a/src/hayhooks/server/utils/deploy_utils.py +++ b/src/hayhooks/server/utils/deploy_utils.py @@ -1,7 +1,10 @@ +import tempfile +import importlib.util from fastapi import HTTPException from fastapi.concurrency import run_in_threadpool from fastapi.responses import JSONResponse - +from pathlib import Path +from hayhooks.server.exceptions import PipelineFilesError from hayhooks.server.pipelines import registry from hayhooks.server.pipelines.models import ( PipelineDefinition, @@ -9,6 +12,8 @@ get_request_model, get_response_model, ) +from hayhooks.server.logger import log +from hayhooks.settings import settings def deploy_pipeline_def(app, pipeline_def: PipelineDefinition): @@ -45,3 +50,82 @@ async def pipeline_run(pipeline_run_req: PipelineRunRequest) -> JSONResponse: # app.setup() return {"name": pipeline_def.name} + + +def save_pipeline_files( + pipeline_name: str, files: dict[str, str], pipelines_dir: str = settings.pipelines_dir +) -> dict[str, str]: + """Save pipeline files to disk and return their paths. + + Args: + pipeline_name: Name of the pipeline + files: Dictionary mapping filenames to their contents + + Returns: + Dictionary mapping filenames to their saved paths + + Raises: + PipelineFilesError: If there are any issues saving the files + """ + try: + # Create pipeline directory under the configured pipelines directory + pipeline_dir = Path(pipelines_dir) / pipeline_name + pipeline_dir.mkdir(parents=True, exist_ok=True) + saved_files = {} + + for filename, content in files.items(): + file_path = pipeline_dir / filename + + # Create parent directories if they don't exist + file_path.parent.mkdir(parents=True, exist_ok=True) + + # Save file content + file_path.write_text(content) + saved_files[filename] = str(file_path) + + return saved_files + + except Exception as e: + raise PipelineFilesError(f"Failed to save pipeline files: {str(e)}") from e + + +def load_pipeline_module(pipeline_name: str, folder_path: Path | str): + """Load a pipeline module from a folder path. + + Args: + pipeline_name: Name of the pipeline + folder_path: Path to the folder containing the pipeline files + + Returns: + The loaded module + + Raises: + ValueError: If the module cannot be loaded + """ + log.debug(f"Loading pipeline module spec for {pipeline_name}") + try: + folder_path = Path(folder_path) + wrapper_path = folder_path / "pipeline_wrapper.py" + + if not wrapper_path.exists(): + raise ValueError(f"Required file '{wrapper_path}' not found") + + spec = importlib.util.spec_from_file_location(pipeline_name, wrapper_path) + if spec is None: + raise ValueError(f"Failed to load '{pipeline_name}' pipeline module spec") + + module = importlib.util.module_from_spec(spec) + if spec.loader is None: + raise ValueError(f"Failed to load '{pipeline_name}' pipeline module") + + spec.loader.exec_module(module) + log.debug(f"Loaded module {module}") + + if not hasattr(module, "PipelineWrapper"): + raise ValueError(f"Failed to load '{pipeline_name}' pipeline module spec") + + return module + + except Exception as e: + log.error(f"Error loading pipeline module: {str(e)}") + raise ValueError(f"Failed to load pipeline module: {str(e)}") from e diff --git a/tests/test_deploy_utils.py b/tests/test_deploy_utils.py new file mode 100644 index 0000000..935a608 --- /dev/null +++ b/tests/test_deploy_utils.py @@ -0,0 +1,78 @@ +import pytest +import shutil +from pathlib import Path +from typing import Callable +from hayhooks.server.utils.deploy_utils import load_pipeline_module, save_pipeline_files +from hayhooks.server.exceptions import PipelineFilesError +from hayhooks.settings import settings + +TEST_PIPELINES_DIR = Path("tests/test_files/test_pipelines") + + +@pytest.fixture(autouse=True) +def cleanup_test_pipelines(): + yield + + if TEST_PIPELINES_DIR.exists(): + shutil.rmtree(TEST_PIPELINES_DIR) + + +def test_load_pipeline_module(): + pipeline_name = "chat_with_website" + pipeline_folder_path = Path("tests/test_files/python/chat_with_website") + + module = load_pipeline_module(pipeline_name, pipeline_folder_path) + + assert module is not None + assert hasattr(module, "PipelineWrapper") + assert isinstance(getattr(module.PipelineWrapper, "run_api"), Callable) + assert isinstance(getattr(module.PipelineWrapper, "run_chat"), Callable) + assert isinstance(getattr(module.PipelineWrapper, "setup"), Callable) + + +def test_load_pipeline_wrong_folder(): + pipeline_name = "chat_with_website" + pipeline_folder_path = Path("tests/test_files/python/wrong_folder") + + with pytest.raises( + ValueError, + match="Required file 'tests/test_files/python/wrong_folder/pipeline_wrapper.py' not found", + ): + load_pipeline_module(pipeline_name, pipeline_folder_path) + + +def test_load_pipeline_no_wrapper(): + pipeline_name = "chat_with_website" + pipeline_folder_path = Path("tests/test_files/python/no_wrapper") + + with pytest.raises( + ValueError, + match="Required file 'tests/test_files/python/no_wrapper/pipeline_wrapper.py' not found", + ): + load_pipeline_module(pipeline_name, pipeline_folder_path) + + +def test_save_pipeline_files_basic(): + files = { + "pipeline_wrapper.py": "print('hello')", + "extra_file.txt": "extra content", + } + + saved_paths = save_pipeline_files("test_pipeline", files, pipelines_dir=TEST_PIPELINES_DIR) + + assert len(saved_paths) == 2 + for filename, path in saved_paths.items(): + assert Path(path).exists() + assert Path(path).read_text() == files[filename] + + +def test_save_pipeline_files_empty(): + pipeline_name = "test_pipeline" + files = {} + + saved_paths = save_pipeline_files(pipeline_name, files, pipelines_dir=TEST_PIPELINES_DIR) + + assert len(saved_paths) == 0 + assert (TEST_PIPELINES_DIR / pipeline_name).exists() + assert (TEST_PIPELINES_DIR / pipeline_name).is_dir() + assert len([file for file in (TEST_PIPELINES_DIR / pipeline_name).iterdir()]) == 0 diff --git a/tests/test_files/python/chat_with_website/chat_with_website.yml b/tests/test_files/python/chat_with_website/chat_with_website.yml new file mode 100644 index 0000000..db4063f --- /dev/null +++ b/tests/test_files/python/chat_with_website/chat_with_website.yml @@ -0,0 +1,50 @@ +components: + converter: + type: haystack.components.converters.html.HTMLToDocument + init_parameters: + extraction_kwargs: null + + fetcher: + init_parameters: + raise_on_failure: true + retry_attempts: 2 + timeout: 3 + user_agents: + - haystack/LinkContentFetcher/2.0.0b8 + type: haystack.components.fetchers.link_content.LinkContentFetcher + + llm: + init_parameters: + api_base_url: null + api_key: + env_vars: + - OPENAI_API_KEY + strict: true + type: env_var + generation_kwargs: {} + model: gpt-4o-mini + streaming_callback: null + system_prompt: null + type: haystack.components.generators.openai.OpenAIGenerator + + prompt: + init_parameters: + template: | + "According to the contents of this website: + {% for document in documents %} + {{document.content}} + {% endfor %} + Answer the given question: {{query}} + Answer: + " + type: haystack.components.builders.prompt_builder.PromptBuilder + +connections: + - receiver: converter.sources + sender: fetcher.streams + - receiver: prompt.documents + sender: converter.documents + - receiver: llm.prompt + sender: prompt.prompt + +metadata: {} diff --git a/tests/test_files/python/chat_with_website/pipeline_wrapper.py b/tests/test_files/python/chat_with_website/pipeline_wrapper.py new file mode 100644 index 0000000..5ef72f0 --- /dev/null +++ b/tests/test_files/python/chat_with_website/pipeline_wrapper.py @@ -0,0 +1,23 @@ +from pathlib import Path +from typing import List +from haystack import Pipeline +from hayhooks.server.utils.base_pipeline_wrapper import BasePipelineWrapper + + +URLS = ["https://haystack.deepset.ai", "https://www.redis.io"] + + +class PipelineWrapper(BasePipelineWrapper): + def __init__(self) -> None: + self.pipeline: Pipeline = None + + def setup(self) -> None: + pipeline_yaml = (Path(__file__).parent / "chat_with_website.yml").read_text() + self.pipeline = Pipeline.loads(pipeline_yaml) + + def run_api(self, urls: List[str], question: str) -> dict: + return self.pipeline.run({"fetcher": {"urls": urls}, "prompt": {"query": question}}) + + def run_chat(self, user_message: str, model_id: str, messages: List[dict], body: dict) -> dict: + question = user_message + return self.pipeline.run({"fetcher": {"urls": URLS}, "prompt": {"query": question}}) diff --git a/tests/test_files/python/no_wrapper/chat_with_website.yml b/tests/test_files/python/no_wrapper/chat_with_website.yml new file mode 100644 index 0000000..db4063f --- /dev/null +++ b/tests/test_files/python/no_wrapper/chat_with_website.yml @@ -0,0 +1,50 @@ +components: + converter: + type: haystack.components.converters.html.HTMLToDocument + init_parameters: + extraction_kwargs: null + + fetcher: + init_parameters: + raise_on_failure: true + retry_attempts: 2 + timeout: 3 + user_agents: + - haystack/LinkContentFetcher/2.0.0b8 + type: haystack.components.fetchers.link_content.LinkContentFetcher + + llm: + init_parameters: + api_base_url: null + api_key: + env_vars: + - OPENAI_API_KEY + strict: true + type: env_var + generation_kwargs: {} + model: gpt-4o-mini + streaming_callback: null + system_prompt: null + type: haystack.components.generators.openai.OpenAIGenerator + + prompt: + init_parameters: + template: | + "According to the contents of this website: + {% for document in documents %} + {{document.content}} + {% endfor %} + Answer the given question: {{query}} + Answer: + " + type: haystack.components.builders.prompt_builder.PromptBuilder + +connections: + - receiver: converter.sources + sender: fetcher.streams + - receiver: prompt.documents + sender: converter.documents + - receiver: llm.prompt + sender: prompt.prompt + +metadata: {} From b200a1884927a69c8b75ec2f93a19eb002d23fb1 Mon Sep 17 00:00:00 2001 From: Michele Pangrazzi Date: Mon, 20 Jan 2025 17:29:41 +0100 Subject: [PATCH 03/23] Add test ; cleanup --- tests/test_deploy_utils.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/tests/test_deploy_utils.py b/tests/test_deploy_utils.py index 935a608..7e32c1a 100644 --- a/tests/test_deploy_utils.py +++ b/tests/test_deploy_utils.py @@ -4,7 +4,6 @@ from typing import Callable from hayhooks.server.utils.deploy_utils import load_pipeline_module, save_pipeline_files from hayhooks.server.exceptions import PipelineFilesError -from hayhooks.settings import settings TEST_PIPELINES_DIR = Path("tests/test_files/test_pipelines") @@ -76,3 +75,16 @@ def test_save_pipeline_files_empty(): assert (TEST_PIPELINES_DIR / pipeline_name).exists() assert (TEST_PIPELINES_DIR / pipeline_name).is_dir() assert len([file for file in (TEST_PIPELINES_DIR / pipeline_name).iterdir()]) == 0 + + +def test_save_pipeline_files_raises_error(tmp_path): + readonly_dir = tmp_path / "readonly" + readonly_dir.mkdir() + readonly_dir.chmod(0o444) + + files = {"test.py": "print('hello')"} + + with pytest.raises(PipelineFilesError) as exc_info: + save_pipeline_files(pipeline_name="test_pipeline", files=files, pipelines_dir=str(readonly_dir)) + + assert "Failed to save pipeline files" in str(exc_info.value) From fd9ce6d8911ed74862fa18193b86f58134a76aa7 Mon Sep 17 00:00:00 2001 From: Michele Pangrazzi Date: Mon, 20 Jan 2025 21:28:37 +0100 Subject: [PATCH 04/23] Add self.pipeline in ABC --- src/hayhooks/server/utils/base_pipeline_wrapper.py | 3 +++ tests/test_files/python/chat_with_website/pipeline_wrapper.py | 3 --- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/hayhooks/server/utils/base_pipeline_wrapper.py b/src/hayhooks/server/utils/base_pipeline_wrapper.py index 5f5a3f2..87a3b75 100644 --- a/src/hayhooks/server/utils/base_pipeline_wrapper.py +++ b/src/hayhooks/server/utils/base_pipeline_wrapper.py @@ -3,6 +3,9 @@ class BasePipelineWrapper(ABC): + def __init__(self): + self.pipeline = None + @abstractmethod def setup(self) -> None: """ diff --git a/tests/test_files/python/chat_with_website/pipeline_wrapper.py b/tests/test_files/python/chat_with_website/pipeline_wrapper.py index 5ef72f0..bb3f68a 100644 --- a/tests/test_files/python/chat_with_website/pipeline_wrapper.py +++ b/tests/test_files/python/chat_with_website/pipeline_wrapper.py @@ -8,9 +8,6 @@ class PipelineWrapper(BasePipelineWrapper): - def __init__(self) -> None: - self.pipeline: Pipeline = None - def setup(self) -> None: pipeline_yaml = (Path(__file__).parent / "chat_with_website.yml").read_text() self.pipeline = Pipeline.loads(pipeline_yaml) From 430734642612cd57662680c7ba8658612b2452fd Mon Sep 17 00:00:00 2001 From: Michele Pangrazzi Date: Mon, 20 Jan 2025 21:29:04 +0100 Subject: [PATCH 05/23] Update registry to support BasePipelineWrapper instances ; update draw route --- src/hayhooks/server/pipelines/registry.py | 32 ++++++---- src/hayhooks/server/routers/draw.py | 2 +- tests/test_it_draw.py | 26 ++++++++ tests/test_registry.py | 78 +++++++++++++++++++++++ 4 files changed, 126 insertions(+), 12 deletions(-) diff --git a/src/hayhooks/server/pipelines/registry.py b/src/hayhooks/server/pipelines/registry.py index 9a58fae..c44cafd 100644 --- a/src/hayhooks/server/pipelines/registry.py +++ b/src/hayhooks/server/pipelines/registry.py @@ -1,32 +1,42 @@ -from typing import Optional +from typing import Optional, Union from haystack import Pipeline from haystack.core.errors import PipelineError +from hayhooks.server.utils.base_pipeline_wrapper import BasePipelineWrapper + +PipelineType = Union[Pipeline, BasePipelineWrapper] class _PipelineRegistry: def __init__(self) -> None: - self._pipelines: dict[str, Pipeline] = {} + self._pipelines: dict[str, PipelineType] = {} - def add(self, name: str, source: str) -> Pipeline: + def add(self, name: str, source_or_pipeline: Union[str, PipelineType]) -> PipelineType: if name in self._pipelines: msg = f"A pipeline with name {name} is already in the registry." raise ValueError(msg) - try: - self._pipelines[name] = Pipeline.loads(source) - except PipelineError as e: - msg = f"Unable to parse Haystack Pipeline {name}: {e}" - raise ValueError(msg) from e + if isinstance(source_or_pipeline, (Pipeline, BasePipelineWrapper)): + pipeline = source_or_pipeline + else: + try: + pipeline = Pipeline.loads(source_or_pipeline) + except PipelineError as e: + msg = f"Unable to parse Haystack Pipeline {name}: {e}" + raise ValueError(msg) from e - return self._pipelines[name] + self._pipelines[name] = pipeline + return pipeline def remove(self, name: str): if name in self._pipelines: del self._pipelines[name] - def get(self, name: str) -> Optional[Pipeline]: - return self._pipelines.get(name) + def get(self, name: str, use_pipeline: bool = False) -> Optional[PipelineType]: + pipeline = self._pipelines.get(name) + if use_pipeline and isinstance(pipeline, BasePipelineWrapper): + return pipeline.pipeline + return pipeline def get_names(self) -> list[str]: return list(self._pipelines.keys()) diff --git a/src/hayhooks/server/routers/draw.py b/src/hayhooks/server/routers/draw.py index bd67144..f8a56a3 100644 --- a/src/hayhooks/server/routers/draw.py +++ b/src/hayhooks/server/routers/draw.py @@ -9,7 +9,7 @@ @router.get("/draw/{pipeline_name}", tags=["config"]) async def draw(pipeline_name): - pipeline = registry.get(pipeline_name) + pipeline = registry.get(pipeline_name, use_pipeline=True) if not pipeline: raise HTTPException(status_code=404) diff --git a/tests/test_it_draw.py b/tests/test_it_draw.py index acc08f0..b297837 100644 --- a/tests/test_it_draw.py +++ b/tests/test_it_draw.py @@ -1,6 +1,10 @@ from fastapi.testclient import TestClient from hayhooks.server import app from pathlib import Path +from typing import List +from haystack import Pipeline +from hayhooks.server.utils.base_pipeline_wrapper import BasePipelineWrapper +from hayhooks.server.pipelines import registry client = TestClient(app) @@ -21,3 +25,25 @@ def test_draw_pipeline(deploy_pipeline, draw_pipeline): def test_draw_non_existent_pipeline(draw_pipeline): draw_response = draw_pipeline(client, "non_existent_pipeline") assert draw_response.status_code == 404 + + +def test_draw_pipeline_wrapper(deploy_pipeline, draw_pipeline): + class TestPipelineWrapper(BasePipelineWrapper): + def setup(self) -> None: + pipeline_file = Path(__file__).parent / "test_files/yaml" / "working_pipelines/test_pipeline_01.yml" + self.pipeline = Pipeline.loads(pipeline_file.read_text()) + + def run_api(self, urls: List[str], question: str) -> dict: + return {} + + def run_chat(self, user_message: str, model_id: str, messages: List[dict], body: dict) -> dict: + return {} + + wrapper = TestPipelineWrapper() + wrapper.setup() + registry.add("test_wrapper", wrapper) + + draw_response = draw_pipeline(client, "test_wrapper") + assert draw_response.status_code == 200 + assert draw_response.headers["Content-Type"] == "image/png" + assert len(draw_response.content) > 0 diff --git a/tests/test_registry.py b/tests/test_registry.py index a568741..549fc43 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -2,8 +2,10 @@ from pathlib import Path from haystack import Pipeline from haystack.core.errors import PipelineError +from typing import List from hayhooks.server.pipelines.registry import _PipelineRegistry +from hayhooks.server.utils.base_pipeline_wrapper import BasePipelineWrapper @pytest.fixture @@ -16,6 +18,21 @@ def sample_pipeline_yaml(): return (Path(__file__).parent / "test_files/yaml" / "working_pipelines" / "basic_rag_pipeline.yml").read_text() +@pytest.fixture +def test_pipeline_wrapper_class(): + class TestPipelineWrapper(BasePipelineWrapper): + def setup(self) -> None: + pass + + def run_api(self, urls: List[str], question: str) -> dict: + return {} + + def run_chat(self, user_message: str, model_id: str, messages: List[dict], body: dict) -> dict: + return {} + + return TestPipelineWrapper + + def test_add_pipeline(pipeline_registry, sample_pipeline_yaml): result = pipeline_registry.add("test_pipeline", sample_pipeline_yaml) @@ -64,3 +81,64 @@ def test_get_names(pipeline_registry, sample_pipeline_yaml, mocker): names = pipeline_registry.get_names() assert sorted(names) == ["pipeline1", "pipeline2"] + + +def test_add_pipeline_instance(pipeline_registry): + pipeline = Pipeline() # Create a simple pipeline instance + result = pipeline_registry.add("test_pipeline", pipeline) + + assert result == pipeline + assert pipeline_registry.get("test_pipeline") == pipeline + + +def test_add_duplicate_pipeline_instance(pipeline_registry): + pipeline = Pipeline() # Create a simple pipeline instance + pipeline_registry.add("test_pipeline", pipeline) + + with pytest.raises(ValueError, match="A pipeline with name test_pipeline is already in the registry"): + pipeline_registry.add("test_pipeline", pipeline) + + +def test_add_pipeline_wrapper(pipeline_registry, test_pipeline_wrapper_class): + wrapper = test_pipeline_wrapper_class() + result = pipeline_registry.add("test_wrapper", wrapper) + + assert result == wrapper + assert pipeline_registry.get("test_wrapper") == wrapper + + +def test_add_duplicate_pipeline_wrapper(pipeline_registry, test_pipeline_wrapper_class): + wrapper = test_pipeline_wrapper_class() + pipeline_registry.add("test_wrapper", wrapper) + + with pytest.raises(ValueError, match="A pipeline with name test_wrapper is already in the registry"): + pipeline_registry.add("test_wrapper", wrapper) + + +def test_get_pipeline_wrapper_with_pipeline(pipeline_registry, test_pipeline_wrapper_class): + class TestPipelineWrapperWithPipeline(test_pipeline_wrapper_class): + def setup(self) -> None: + self.pipeline = Pipeline() # Create a simple pipeline instance + + wrapper = TestPipelineWrapperWithPipeline() + wrapper.setup() + pipeline_registry.add("test_wrapper", wrapper) + + assert pipeline_registry.get("test_wrapper") == wrapper + assert pipeline_registry.get("test_wrapper", use_pipeline=True) == wrapper.pipeline + + +def test_get_pipeline_wrapper_without_pipeline(pipeline_registry, test_pipeline_wrapper_class): + wrapper = test_pipeline_wrapper_class() + pipeline_registry.add("test_wrapper", wrapper) + + assert pipeline_registry.get("test_wrapper") == wrapper + assert pipeline_registry.get("test_wrapper", use_pipeline=True) is None + + +def test_get_regular_pipeline_with_use_pipeline(pipeline_registry): + pipeline = Pipeline() + pipeline_registry.add("test_pipeline", pipeline) + + assert pipeline_registry.get("test_pipeline") == pipeline + assert pipeline_registry.get("test_pipeline", use_pipeline=True) == pipeline From 597fa5c9087d09ad05fdc2f4ecd5cdba36c76352 Mon Sep 17 00:00:00 2001 From: Michele Pangrazzi Date: Mon, 20 Jan 2025 21:49:56 +0100 Subject: [PATCH 06/23] Fix for python 3.9 --- src/hayhooks/server/utils/deploy_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/hayhooks/server/utils/deploy_utils.py b/src/hayhooks/server/utils/deploy_utils.py index 2545af1..5ccf203 100644 --- a/src/hayhooks/server/utils/deploy_utils.py +++ b/src/hayhooks/server/utils/deploy_utils.py @@ -1,5 +1,5 @@ -import tempfile import importlib.util +from typing import Union from fastapi import HTTPException from fastapi.concurrency import run_in_threadpool from fastapi.responses import JSONResponse @@ -89,7 +89,7 @@ def save_pipeline_files( raise PipelineFilesError(f"Failed to save pipeline files: {str(e)}") from e -def load_pipeline_module(pipeline_name: str, folder_path: Path | str): +def load_pipeline_module(pipeline_name: str, folder_path: Union[Path, str]): """Load a pipeline module from a folder path. Args: From 5fb3e290d6c4c2120352927468e1f20a9210d29e Mon Sep 17 00:00:00 2001 From: Michele Pangrazzi Date: Thu, 23 Jan 2025 10:10:59 +0100 Subject: [PATCH 07/23] Add custom formetter to print out extra info when using .bind() --- src/hayhooks/server/logger.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/hayhooks/server/logger.py b/src/hayhooks/server/logger.py index 088458e..5fd3738 100644 --- a/src/hayhooks/server/logger.py +++ b/src/hayhooks/server/logger.py @@ -1,8 +1,18 @@ -# logger.py - import os import sys from loguru import logger as log + +def formatter(record): + if record["extra"]: + return "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} | {message}\n{extra}\n" + + return "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} | {message}\n" + + log.remove() -log.add(sys.stderr, level=os.getenv("LOG", "INFO").upper()) +log.add( + sys.stderr, + level=os.getenv("LOG", "INFO").upper(), + format=formatter +) From d135c380f66da2bd88bb90e0804906f317cd3e37 Mon Sep 17 00:00:00 2001 From: Michele Pangrazzi Date: Thu, 23 Jan 2025 10:17:45 +0100 Subject: [PATCH 08/23] fixed registry tests ; remove .get() use_pipeline arg --- src/hayhooks/server/pipelines/registry.py | 7 +-- tests/test_registry.py | 54 +++++++++-------------- 2 files changed, 22 insertions(+), 39 deletions(-) diff --git a/src/hayhooks/server/pipelines/registry.py b/src/hayhooks/server/pipelines/registry.py index c44cafd..b96cc89 100644 --- a/src/hayhooks/server/pipelines/registry.py +++ b/src/hayhooks/server/pipelines/registry.py @@ -32,11 +32,8 @@ def remove(self, name: str): if name in self._pipelines: del self._pipelines[name] - def get(self, name: str, use_pipeline: bool = False) -> Optional[PipelineType]: - pipeline = self._pipelines.get(name) - if use_pipeline and isinstance(pipeline, BasePipelineWrapper): - return pipeline.pipeline - return pipeline + def get(self, name: str) -> Optional[PipelineType]: + return self._pipelines.get(name) def get_names(self) -> list[str]: return list(self._pipelines.keys()) diff --git a/tests/test_registry.py b/tests/test_registry.py index 549fc43..fdfbc9b 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -36,19 +36,9 @@ def run_chat(self, user_message: str, model_id: str, messages: List[dict], body: def test_add_pipeline(pipeline_registry, sample_pipeline_yaml): result = pipeline_registry.add("test_pipeline", sample_pipeline_yaml) - expected_pipeline = Pipeline.loads(sample_pipeline_yaml) - assert result == expected_pipeline - assert pipeline_registry.get("test_pipeline") == expected_pipeline - - -def test_add_duplicate_pipeline(pipeline_registry, sample_pipeline_yaml): - result = pipeline_registry.add("test_pipeline", sample_pipeline_yaml) - - expected_pipeline = Pipeline.loads(sample_pipeline_yaml) - assert result == expected_pipeline - - with pytest.raises(ValueError, match="A pipeline with name test_pipeline is already in the registry"): - pipeline_registry.add("test_pipeline", sample_pipeline_yaml) + pipeline = Pipeline.loads(sample_pipeline_yaml) + assert result == pipeline + assert pipeline_registry.get("test_pipeline") == pipeline def test_add_invalid_pipeline(pipeline_registry, mocker): @@ -83,16 +73,8 @@ def test_get_names(pipeline_registry, sample_pipeline_yaml, mocker): assert sorted(names) == ["pipeline1", "pipeline2"] -def test_add_pipeline_instance(pipeline_registry): - pipeline = Pipeline() # Create a simple pipeline instance - result = pipeline_registry.add("test_pipeline", pipeline) - - assert result == pipeline - assert pipeline_registry.get("test_pipeline") == pipeline - - def test_add_duplicate_pipeline_instance(pipeline_registry): - pipeline = Pipeline() # Create a simple pipeline instance + pipeline = Pipeline() pipeline_registry.add("test_pipeline", pipeline) with pytest.raises(ValueError, match="A pipeline with name test_pipeline is already in the registry"): @@ -115,30 +97,34 @@ def test_add_duplicate_pipeline_wrapper(pipeline_registry, test_pipeline_wrapper pipeline_registry.add("test_wrapper", wrapper) -def test_get_pipeline_wrapper_with_pipeline(pipeline_registry, test_pipeline_wrapper_class): +def test_get_pipeline_wrapper(pipeline_registry, test_pipeline_wrapper_class): class TestPipelineWrapperWithPipeline(test_pipeline_wrapper_class): def setup(self) -> None: - self.pipeline = Pipeline() # Create a simple pipeline instance + self.pipeline = Pipeline() wrapper = TestPipelineWrapperWithPipeline() wrapper.setup() pipeline_registry.add("test_wrapper", wrapper) assert pipeline_registry.get("test_wrapper") == wrapper - assert pipeline_registry.get("test_wrapper", use_pipeline=True) == wrapper.pipeline -def test_get_pipeline_wrapper_without_pipeline(pipeline_registry, test_pipeline_wrapper_class): +def test_clear_registry(pipeline_registry, sample_pipeline_yaml, test_pipeline_wrapper_class): + pipeline_registry.add("pipeline1", sample_pipeline_yaml) wrapper = test_pipeline_wrapper_class() - pipeline_registry.add("test_wrapper", wrapper) + pipeline_registry.add("wrapper1", wrapper) - assert pipeline_registry.get("test_wrapper") == wrapper - assert pipeline_registry.get("test_wrapper", use_pipeline=True) is None + pipeline_registry.clear() + assert len(pipeline_registry.get_names()) == 0 + assert pipeline_registry.get("pipeline1") is None + assert pipeline_registry.get("wrapper1") is None -def test_get_regular_pipeline_with_use_pipeline(pipeline_registry): - pipeline = Pipeline() - pipeline_registry.add("test_pipeline", pipeline) +def test_remove_pipeline_preserves_others(pipeline_registry, sample_pipeline_yaml, test_pipeline_wrapper_class): + pipeline_registry.add("pipeline1", sample_pipeline_yaml) + wrapper = test_pipeline_wrapper_class() + pipeline_registry.add("wrapper1", wrapper) - assert pipeline_registry.get("test_pipeline") == pipeline - assert pipeline_registry.get("test_pipeline", use_pipeline=True) == pipeline + pipeline_registry.remove("pipeline1") + assert "pipeline1" not in pipeline_registry.get_names() + assert pipeline_registry.get("wrapper1") == wrapper From 6c9499979f53df3de67d7419f629bf4b81e64e1a Mon Sep 17 00:00:00 2001 From: Michele Pangrazzi Date: Thu, 23 Jan 2025 10:18:30 +0100 Subject: [PATCH 09/23] add /deploy_files route and implement new deployment system methods ; update tests --- src/hayhooks/server/exceptions.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/hayhooks/server/exceptions.py b/src/hayhooks/server/exceptions.py index 5dc800f..18aa6a6 100644 --- a/src/hayhooks/server/exceptions.py +++ b/src/hayhooks/server/exceptions.py @@ -1,3 +1,20 @@ class PipelineFilesError(Exception): """Exception for errors saving pipeline files.""" + + pass + + +class PipelineWrapperError(Exception): + """Exception for errors loading pipeline wrapper.""" + + pass + + +class PipelineModuleLoadError(Exception): + """Exception for errors loading pipeline module.""" + + +class PipelineAlreadyExistsError(Exception): + """Exception for errors when a pipeline already exists.""" + pass From fddcc1f500b21bd1decd34cbbb79b05a1aa13425 Mon Sep 17 00:00:00 2001 From: Michele Pangrazzi Date: Thu, 23 Jan 2025 10:25:45 +0100 Subject: [PATCH 10/23] Fix / update tests --- src/hayhooks/server/routers/deploy.py | 35 +++- src/hayhooks/server/routers/draw.py | 7 +- src/hayhooks/server/utils/deploy_utils.py | 162 ++++++++++++++++-- tests/test_deploy_utils.py | 45 ++++- .../chat_with_website/pipeline_wrapper.py | 10 +- tests/test_it_deploy.py | 1 + tests/test_it_deploy_files.py | 82 +++++++++ 7 files changed, 318 insertions(+), 24 deletions(-) create mode 100644 tests/test_it_deploy_files.py diff --git a/src/hayhooks/server/routers/deploy.py b/src/hayhooks/server/routers/deploy.py index 08d50e0..c5cd98f 100644 --- a/src/hayhooks/server/routers/deploy.py +++ b/src/hayhooks/server/routers/deploy.py @@ -1,9 +1,40 @@ -from fastapi import APIRouter, Request -from hayhooks.server.utils.deploy_utils import deploy_pipeline_def, PipelineDefinition +from fastapi import APIRouter, Request, HTTPException +from pydantic import BaseModel +from typing import Dict +from hayhooks.server.exceptions import PipelineAlreadyExistsError +from hayhooks.server.utils.deploy_utils import ( + deploy_pipeline_def, + PipelineDefinition, + deploy_pipeline_files, + PipelineFilesError, + PipelineModuleLoadError, + PipelineWrapperError, +) router = APIRouter() +class PipelineFilesRequest(BaseModel): + name: str + files: Dict[str, str] + + @router.post("/deploy", tags=["config"]) async def deploy(pipeline_def: PipelineDefinition, request: Request): return deploy_pipeline_def(request.app, pipeline_def) + + +@router.post("/deploy_files", tags=["config"]) +async def deploy_files(pipeline_files: PipelineFilesRequest, request: Request): + try: + return deploy_pipeline_files(request.app, pipeline_files.name, pipeline_files.files) + except PipelineFilesError as e: + raise HTTPException(status_code=500, detail=str(e)) + except PipelineModuleLoadError as e: + raise HTTPException(status_code=422, detail=str(e)) + except PipelineWrapperError as e: + raise HTTPException(status_code=422, detail=str(e)) + except PipelineAlreadyExistsError as e: + raise HTTPException(status_code=409, detail=str(e)) + except Exception as e: + raise HTTPException(status_code=500, detail=f"Unexpected error deploying pipeline: {str(e)}") diff --git a/src/hayhooks/server/routers/draw.py b/src/hayhooks/server/routers/draw.py index f8a56a3..507481a 100644 --- a/src/hayhooks/server/routers/draw.py +++ b/src/hayhooks/server/routers/draw.py @@ -3,13 +3,18 @@ from fastapi import APIRouter, HTTPException from fastapi.responses import FileResponse from hayhooks.server.pipelines import registry +from hayhooks.server.utils.base_pipeline_wrapper import BasePipelineWrapper router = APIRouter() @router.get("/draw/{pipeline_name}", tags=["config"]) async def draw(pipeline_name): - pipeline = registry.get(pipeline_name, use_pipeline=True) + pipeline = registry.get(pipeline_name) + + if isinstance(pipeline, BasePipelineWrapper): + pipeline = pipeline.pipeline + if not pipeline: raise HTTPException(status_code=404) diff --git a/src/hayhooks/server/utils/deploy_utils.py b/src/hayhooks/server/utils/deploy_utils.py index 5ccf203..f0846f7 100644 --- a/src/hayhooks/server/utils/deploy_utils.py +++ b/src/hayhooks/server/utils/deploy_utils.py @@ -1,10 +1,17 @@ +import inspect import importlib.util -from typing import Union -from fastapi import HTTPException +from types import ModuleType +from typing import Callable, Union, List +from fastapi import FastAPI, HTTPException from fastapi.concurrency import run_in_threadpool from fastapi.responses import JSONResponse from pathlib import Path -from hayhooks.server.exceptions import PipelineFilesError +from hayhooks.server.exceptions import ( + PipelineAlreadyExistsError, + PipelineFilesError, + PipelineModuleLoadError, + PipelineWrapperError, +) from hayhooks.server.pipelines import registry from hayhooks.server.pipelines.models import ( PipelineDefinition, @@ -14,6 +21,18 @@ ) from hayhooks.server.logger import log from hayhooks.settings import settings +from pydantic import BaseModel, create_model + + +class ChatRequest(BaseModel): + user_message: str + model_id: str + messages: List[dict] + body: dict + + +class ChatResponse(BaseModel): + result: dict def deploy_pipeline_def(app, pipeline_def: PipelineDefinition): @@ -89,7 +108,7 @@ def save_pipeline_files( raise PipelineFilesError(f"Failed to save pipeline files: {str(e)}") from e -def load_pipeline_module(pipeline_name: str, folder_path: Union[Path, str]): +def load_pipeline_module(pipeline_name: str, folder_path: Union[Path, str]) -> ModuleType: """Load a pipeline module from a folder path. Args: @@ -102,30 +121,147 @@ def load_pipeline_module(pipeline_name: str, folder_path: Union[Path, str]): Raises: ValueError: If the module cannot be loaded """ - log.debug(f"Loading pipeline module spec for {pipeline_name}") try: folder_path = Path(folder_path) wrapper_path = folder_path / "pipeline_wrapper.py" if not wrapper_path.exists(): - raise ValueError(f"Required file '{wrapper_path}' not found") + raise PipelineWrapperError(f"Required file '{wrapper_path}' not found") spec = importlib.util.spec_from_file_location(pipeline_name, wrapper_path) - if spec is None: - raise ValueError(f"Failed to load '{pipeline_name}' pipeline module spec") + if spec is None or spec.loader is None: + raise PipelineModuleLoadError( + f"Failed to load pipeline module '{pipeline_name}' - module loader not available" + ) module = importlib.util.module_from_spec(spec) - if spec.loader is None: - raise ValueError(f"Failed to load '{pipeline_name}' pipeline module") - spec.loader.exec_module(module) log.debug(f"Loaded module {module}") if not hasattr(module, "PipelineWrapper"): - raise ValueError(f"Failed to load '{pipeline_name}' pipeline module spec") + raise PipelineWrapperError(f"Failed to load '{pipeline_name}' pipeline module spec") return module except Exception as e: log.error(f"Error loading pipeline module: {str(e)}") - raise ValueError(f"Failed to load pipeline module: {str(e)}") from e + raise PipelineModuleLoadError(f"Failed to load pipeline module '{pipeline_name}' - {str(e)}") from e + + +def create_request_model_from_callable(func: Callable, model_name: str): + """Create a dynamic Pydantic model based on callable's signature. + + Args: + func: The callable (function/method) to analyze + model_name: Name for the generated model + + Returns: + Pydantic model class for request + """ + + params = inspect.signature(func).parameters + fields = { + name: (param.annotation, ... if param.default == param.empty else param.default) + for name, param in params.items() + } + return create_model(f'{model_name}Request', **fields) + + +def create_response_model_from_callable(func: Callable, model_name: str): + """Create a dynamic Pydantic model based on callable's return type. + + Args: + func: The callable (function/method) to analyze + model_name: Name for the generated model + + Returns: + Pydantic model class for response + """ + + return_type = inspect.signature(func).return_annotation + return create_model(f'{model_name}Response', result=(return_type, ...)) + + +def deploy_pipeline_files(app: FastAPI, pipeline_name: str, files: dict[str, str]): + """Deploy pipeline files to the FastAPI application and set up endpoints. + + Args: + app: FastAPI application instance + pipeline_name: Name of the pipeline to deploy + files: Dictionary mapping filenames to their contents + + Returns: + dict: Dictionary containing the deployed pipeline name + + Raises: + PipelineFilesError: If there are issues saving or loading pipeline files + """ + + log.debug(f"Checking if pipeline '{pipeline_name}' already exists: {registry.get(pipeline_name)}") + if registry.get(pipeline_name): + raise PipelineAlreadyExistsError(f"Pipeline '{pipeline_name}' already exists") + + log.debug(f"Saving pipeline files for '{pipeline_name}'") + save_pipeline_files(pipeline_name, files=files) + + pipeline_dir = Path(settings.pipelines_dir) / pipeline_name + clog = log.bind(pipeline_name=pipeline_name, pipeline_dir=str(pipeline_dir), files=list(files.keys())) + + clog.debug("Loading pipeline module") + module = load_pipeline_module(pipeline_name, folder_path=pipeline_dir) + + clog.debug("Creating PipelineWrapper instance") + pipeline_wrapper = module.PipelineWrapper() + + clog.debug("Running setup()") + pipeline_wrapper.setup() + + clog.debug("Adding pipeline to registry") + registry.add(pipeline_name, pipeline_wrapper) + + clog.debug("Creating dynamic Pydantic models for run_api") + RunRequest = create_request_model_from_callable(pipeline_wrapper.run_api, f'{pipeline_name}Run') + RunResponse = create_response_model_from_callable(pipeline_wrapper.run_api, f'{pipeline_name}Run') + + clog.debug("Creating endpoints") + + async def run_endpoint(run_req: RunRequest) -> JSONResponse: # type: ignore + result = await run_in_threadpool(pipeline_wrapper.run_api, urls=run_req.urls, question=run_req.question) + return JSONResponse({"result": result}, status_code=200) + + async def chat_endpoint(chat_req: ChatRequest) -> JSONResponse: + result = await run_in_threadpool( + pipeline_wrapper.run_chat, + user_message=chat_req.user_message, + model_id=chat_req.model_id, + messages=chat_req.messages, + body=chat_req.body, + ) + return JSONResponse({"result": result}, status_code=200) + + # Add routes + app.add_api_route( + path=f"/{pipeline_name}/run", + endpoint=run_endpoint, + methods=["POST"], + name=f"{pipeline_name}_run", + response_model=RunResponse, + tags=["pipelines"], + ) + + app.add_api_route( + path=f"/{pipeline_name}/chat", + endpoint=chat_endpoint, + methods=["POST"], + name=f"{pipeline_name}_chat", + response_model=ChatResponse, + tags=["pipelines"], + ) + + clog.debug("Setting up FastAPI app") + app.openapi_schema = None + app.setup() + + clog.success("Pipeline deployment complete") + + return {"name": pipeline_name} diff --git a/tests/test_deploy_utils.py b/tests/test_deploy_utils.py index 7e32c1a..004b38b 100644 --- a/tests/test_deploy_utils.py +++ b/tests/test_deploy_utils.py @@ -2,8 +2,19 @@ import shutil from pathlib import Path from typing import Callable -from hayhooks.server.utils.deploy_utils import load_pipeline_module, save_pipeline_files -from hayhooks.server.exceptions import PipelineFilesError +from hayhooks.server.utils.deploy_utils import ( + load_pipeline_module, + save_pipeline_files, + create_request_model_from_callable, + create_response_model_from_callable, + deploy_pipeline_files, +) +from hayhooks.server.exceptions import ( + PipelineFilesError, + PipelineModuleLoadError, + PipelineWrapperError, + PipelineAlreadyExistsError, +) TEST_PIPELINES_DIR = Path("tests/test_files/test_pipelines") @@ -34,7 +45,7 @@ def test_load_pipeline_wrong_folder(): pipeline_folder_path = Path("tests/test_files/python/wrong_folder") with pytest.raises( - ValueError, + PipelineModuleLoadError, match="Required file 'tests/test_files/python/wrong_folder/pipeline_wrapper.py' not found", ): load_pipeline_module(pipeline_name, pipeline_folder_path) @@ -45,7 +56,7 @@ def test_load_pipeline_no_wrapper(): pipeline_folder_path = Path("tests/test_files/python/no_wrapper") with pytest.raises( - ValueError, + PipelineModuleLoadError, match="Required file 'tests/test_files/python/no_wrapper/pipeline_wrapper.py' not found", ): load_pipeline_module(pipeline_name, pipeline_folder_path) @@ -88,3 +99,29 @@ def test_save_pipeline_files_raises_error(tmp_path): save_pipeline_files(pipeline_name="test_pipeline", files=files, pipelines_dir=str(readonly_dir)) assert "Failed to save pipeline files" in str(exc_info.value) + + +def test_create_request_model_from_callable(): + def sample_func(name: str, age: int = 25, optional: str = ""): + pass + + model = create_request_model_from_callable(sample_func, "Test") + + assert model.__name__ == "TestRequest" + assert model.model_fields["name"].annotation == str + assert model.model_fields["name"].is_required + assert model.model_fields["age"].annotation == int + assert model.model_fields["age"].default == 25 + assert model.model_fields["optional"].annotation == str + assert model.model_fields["optional"].default == "" + + +def test_create_response_model_from_callable(): + def sample_func() -> dict: + return {"result": "test"} + + model = create_response_model_from_callable(sample_func, "Test") + + assert model.__name__ == "TestResponse" + assert model.model_fields["result"].annotation == dict + assert model.model_fields["result"].is_required diff --git a/tests/test_files/python/chat_with_website/pipeline_wrapper.py b/tests/test_files/python/chat_with_website/pipeline_wrapper.py index bb3f68a..5473d78 100644 --- a/tests/test_files/python/chat_with_website/pipeline_wrapper.py +++ b/tests/test_files/python/chat_with_website/pipeline_wrapper.py @@ -12,9 +12,11 @@ def setup(self) -> None: pipeline_yaml = (Path(__file__).parent / "chat_with_website.yml").read_text() self.pipeline = Pipeline.loads(pipeline_yaml) - def run_api(self, urls: List[str], question: str) -> dict: - return self.pipeline.run({"fetcher": {"urls": urls}, "prompt": {"query": question}}) + def run_api(self, urls: List[str], question: str) -> str: + result = self.pipeline.run({"fetcher": {"urls": urls}, "prompt": {"query": question}}) + return result["llm"]["replies"][0] - def run_chat(self, user_message: str, model_id: str, messages: List[dict], body: dict) -> dict: + def run_chat(self, user_message: str, model_id: str, messages: List[dict], body: dict) -> str: question = user_message - return self.pipeline.run({"fetcher": {"urls": URLS}, "prompt": {"query": question}}) + result = self.pipeline.run({"fetcher": {"urls": URLS}, "prompt": {"query": question}}) + return result["llm"]["replies"][0] diff --git a/tests/test_it_deploy.py b/tests/test_it_deploy.py index cd5fffb..75f0d25 100644 --- a/tests/test_it_deploy.py +++ b/tests/test_it_deploy.py @@ -10,6 +10,7 @@ @pytest.fixture(autouse=True) def clear_registry(): registry.clear() + yield # Load pipeline definitions from test_files diff --git a/tests/test_it_deploy_files.py b/tests/test_it_deploy_files.py new file mode 100644 index 0000000..820dc1b --- /dev/null +++ b/tests/test_it_deploy_files.py @@ -0,0 +1,82 @@ +import pytest +from fastapi.testclient import TestClient +from hayhooks.server import app +from pathlib import Path +from hayhooks.server.pipelines.registry import registry +from hayhooks.settings import settings +import shutil + +client = TestClient(app) + + +def cleanup(): + registry.clear() + if Path(settings.pipelines_dir).exists(): + shutil.rmtree(settings.pipelines_dir) + + +@pytest.fixture(autouse=True) +def clear_registry(): + cleanup() + yield + + +@pytest.fixture(scope="session", autouse=True) +def final_cleanup(): + yield + cleanup() + + +# Load test pipeline files from test directory +TEST_FILES_DIR = Path(__file__).parent / "test_files/python/chat_with_website" +PIPELINE_FILES = { + "pipeline_wrapper.py": (TEST_FILES_DIR / "pipeline_wrapper.py").read_text(), + "chat_with_website.yml": (TEST_FILES_DIR / "chat_with_website.yml").read_text(), +} + + +def test_deploy_files_ok(status_pipeline): + pipeline_data = {"name": "test_pipeline", "files": PIPELINE_FILES} + + response = client.post("/deploy_files", json=pipeline_data) + assert response.status_code == 200 + assert response.json() == {"name": "test_pipeline"} + + status_response = status_pipeline(client, pipeline_data["name"]) + assert pipeline_data["name"] in status_response.json()["pipeline"] + + docs_response = client.get("/docs") + assert docs_response.status_code == 200 + + status_response = status_pipeline(client, pipeline_data["name"]) + assert pipeline_data["name"] in status_response.json()["pipeline"] + + +def test_deploy_files_missing_wrapper(): + pipeline_data = {"name": "test_pipeline", "files": PIPELINE_FILES.copy()} + pipeline_data["files"].pop("pipeline_wrapper.py") + + response = client.post("/deploy_files", json=pipeline_data) + assert response.status_code == 422 + assert "Required file" in response.json()["detail"] + + +def test_deploy_files_invalid_wrapper(): + invalid_files = { + "pipeline_wrapper.py": "invalid python code", + "chat_with_website.yml": PIPELINE_FILES["chat_with_website.yml"], + } + + response = client.post("/deploy_files", json={"name": "test_pipeline", "files": invalid_files}) + assert response.status_code == 422 + assert "Failed to load pipeline module" in response.json()["detail"] + + +def test_deploy_files_duplicate_pipeline(): + response = client.post("/deploy_files", json={"name": "test_pipeline", "files": PIPELINE_FILES}) + assert response.status_code == 200 + + response = client.post("/deploy_files", json={"name": "test_pipeline", "files": PIPELINE_FILES}) + print(response.json()) + assert response.status_code == 409 + assert "Pipeline 'test_pipeline' already exists" in response.json()["detail"] From 62a063995dbf1953aea0c82ed0666059e235675c Mon Sep 17 00:00:00 2001 From: Michele Pangrazzi Date: Thu, 23 Jan 2025 11:29:05 +0100 Subject: [PATCH 11/23] add CLI for deploy-files command --- src/hayhooks/cli/__init__.py | 2 + src/hayhooks/cli/deploy_files/__init__.py | 55 +++++++++++++++++++++++ 2 files changed, 57 insertions(+) create mode 100644 src/hayhooks/cli/deploy_files/__init__.py diff --git a/src/hayhooks/cli/__init__.py b/src/hayhooks/cli/__init__.py index cb518b4..13b1abb 100644 --- a/src/hayhooks/cli/__init__.py +++ b/src/hayhooks/cli/__init__.py @@ -3,6 +3,7 @@ # SPDX-License-Identifier: Apache-2.0 import click +from hayhooks.cli.deploy_files import deploy_files from hayhooks.cli.run import run from hayhooks.cli.deploy import deploy from hayhooks.cli.status import status @@ -22,3 +23,4 @@ def hayhooks(ctx, server, disable_ssl): hayhooks.add_command(deploy) hayhooks.add_command(status) hayhooks.add_command(undeploy) +hayhooks.add_command(deploy_files) diff --git a/src/hayhooks/cli/deploy_files/__init__.py b/src/hayhooks/cli/deploy_files/__init__.py new file mode 100644 index 0000000..9ece750 --- /dev/null +++ b/src/hayhooks/cli/deploy_files/__init__.py @@ -0,0 +1,55 @@ +import click +import requests +from pathlib import Path +from urllib.parse import urljoin + + +def should_skip_file(file_path: Path) -> bool: + """ + Determine if a file should be skipped during deployment. + Handles hidden files across different operating systems. + """ + if file_path.is_dir(): + return True + + if file_path.name.startswith('.'): + return True + + return False + + +@click.command() +@click.pass_obj +@click.option('-n', '--name', required=True, help="Name of the pipeline to deploy") +@click.argument('folder', type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path)) +def deploy_files(server_conf, name, folder): + """Deploy all pipeline files from a folder to the Hayhooks server.""" + server, disable_ssl = server_conf + + files_dict = {} + try: + for file_path in folder.iterdir(): + if should_skip_file(file_path): + continue + + try: + files_dict[file_path.name] = file_path.read_text() + except Exception as e: + click.echo(f"Error reading file {file_path}: {str(e)}") + return + + if not files_dict: + click.echo("Error: No valid files found in the specified folder") + return + + resp = requests.post( + urljoin(server, "deploy_files"), json={"name": name, "files": files_dict}, verify=not disable_ssl + ) + + if resp.status_code >= 400: + click.echo(f"Error deploying pipeline: {resp.json().get('detail')}") + else: + click.echo(f"Pipeline successfully deployed with name: {resp.json().get('name')}") + + except Exception as e: + click.echo(f"Error processing folder: {str(e)}") From 61bb50eca9fb60b6827104f4b9225224ea6e038f Mon Sep 17 00:00:00 2001 From: Michele Pangrazzi Date: Thu, 23 Jan 2025 11:29:19 +0100 Subject: [PATCH 12/23] handle exceptions during pipeline execution on dynamically added API routes --- src/hayhooks/server/utils/deploy_utils.py | 22 ++++++++++++++++++- tests/test_deploy_utils.py | 3 --- .../chat_with_website/pipeline_wrapper.py | 5 +++++ tests/test_it_deploy_files.py | 15 ++++++++++++- 4 files changed, 40 insertions(+), 5 deletions(-) diff --git a/src/hayhooks/server/utils/deploy_utils.py b/src/hayhooks/server/utils/deploy_utils.py index f0846f7..80b273a 100644 --- a/src/hayhooks/server/utils/deploy_utils.py +++ b/src/hayhooks/server/utils/deploy_utils.py @@ -1,5 +1,6 @@ import inspect import importlib.util +from functools import wraps from types import ModuleType from typing import Callable, Union, List from fastapi import FastAPI, HTTPException @@ -182,6 +183,23 @@ def create_response_model_from_callable(func: Callable, model_name: str): return create_model(f'{model_name}Response', result=(return_type, ...)) +def handle_pipeline_exceptions(): + """Decorator to handle pipeline execution exceptions.""" + + def decorator(func): + @wraps(func) # Preserve the original function's metadata + async def wrapper(*args, **kwargs): + try: + return await func(*args, **kwargs) + except Exception as e: + log.error(f"Pipeline execution error: {str(e)}") + raise HTTPException(status_code=500, detail=f"Pipeline execution failed: {str(e)}") + + return wrapper + + return decorator + + def deploy_pipeline_files(app: FastAPI, pipeline_name: str, files: dict[str, str]): """Deploy pipeline files to the FastAPI application and set up endpoints. @@ -223,12 +241,14 @@ def deploy_pipeline_files(app: FastAPI, pipeline_name: str, files: dict[str, str RunRequest = create_request_model_from_callable(pipeline_wrapper.run_api, f'{pipeline_name}Run') RunResponse = create_response_model_from_callable(pipeline_wrapper.run_api, f'{pipeline_name}Run') - clog.debug("Creating endpoints") + clog.debug("Adding new API endpoints") + @handle_pipeline_exceptions() async def run_endpoint(run_req: RunRequest) -> JSONResponse: # type: ignore result = await run_in_threadpool(pipeline_wrapper.run_api, urls=run_req.urls, question=run_req.question) return JSONResponse({"result": result}, status_code=200) + @handle_pipeline_exceptions() async def chat_endpoint(chat_req: ChatRequest) -> JSONResponse: result = await run_in_threadpool( pipeline_wrapper.run_chat, diff --git a/tests/test_deploy_utils.py b/tests/test_deploy_utils.py index 004b38b..30ed299 100644 --- a/tests/test_deploy_utils.py +++ b/tests/test_deploy_utils.py @@ -7,13 +7,10 @@ save_pipeline_files, create_request_model_from_callable, create_response_model_from_callable, - deploy_pipeline_files, ) from hayhooks.server.exceptions import ( PipelineFilesError, PipelineModuleLoadError, - PipelineWrapperError, - PipelineAlreadyExistsError, ) TEST_PIPELINES_DIR = Path("tests/test_files/test_pipelines") diff --git a/tests/test_files/python/chat_with_website/pipeline_wrapper.py b/tests/test_files/python/chat_with_website/pipeline_wrapper.py index 5473d78..43ed503 100644 --- a/tests/test_files/python/chat_with_website/pipeline_wrapper.py +++ b/tests/test_files/python/chat_with_website/pipeline_wrapper.py @@ -2,6 +2,7 @@ from typing import List from haystack import Pipeline from hayhooks.server.utils.base_pipeline_wrapper import BasePipelineWrapper +from hayhooks.server.logger import log URLS = ["https://haystack.deepset.ai", "https://www.redis.io"] @@ -13,10 +14,14 @@ def setup(self) -> None: self.pipeline = Pipeline.loads(pipeline_yaml) def run_api(self, urls: List[str], question: str) -> str: + log.trace(f"Running pipeline with urls: {urls} and question: {question}") result = self.pipeline.run({"fetcher": {"urls": urls}, "prompt": {"query": question}}) return result["llm"]["replies"][0] def run_chat(self, user_message: str, model_id: str, messages: List[dict], body: dict) -> str: + log.trace( + f"Running pipeline with user_message: {user_message}, model_id: {model_id}, messages: {messages}, body: {body}" + ) question = user_message result = self.pipeline.run({"fetcher": {"urls": URLS}, "prompt": {"query": question}}) return result["llm"]["replies"][0] diff --git a/tests/test_it_deploy_files.py b/tests/test_it_deploy_files.py index 820dc1b..146c004 100644 --- a/tests/test_it_deploy_files.py +++ b/tests/test_it_deploy_files.py @@ -77,6 +77,19 @@ def test_deploy_files_duplicate_pipeline(): assert response.status_code == 200 response = client.post("/deploy_files", json={"name": "test_pipeline", "files": PIPELINE_FILES}) - print(response.json()) assert response.status_code == 409 assert "Pipeline 'test_pipeline' already exists" in response.json()["detail"] + + +def test_pipeline_endpoint_error_handling(): + pipeline_data = {"name": "test_pipeline", "files": PIPELINE_FILES} + + response = client.post("/deploy_files", json=pipeline_data) + assert response.status_code == 200 + + run_response = client.post( + "/test_pipeline/run", + json={"urls": ["hptts://www.redis.io"], "question": "What is Redis?"}, # malformed url should trigger an error + ) + assert run_response.status_code == 500 + assert "Pipeline execution failed" in run_response.json()["detail"] From 8a677aaf176ffca0f92d0447ef297a403874b6db Mon Sep 17 00:00:00 2001 From: Michele Pangrazzi Date: Thu, 23 Jan 2025 15:59:29 +0100 Subject: [PATCH 13/23] Ignore default pipelines folder --- .gitignore | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.gitignore b/.gitignore index e68aac6..074972f 100644 --- a/.gitignore +++ b/.gitignore @@ -159,3 +159,7 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ + + +# Pipelines default folder +/pipelines From c448a387708e36b809b873771387b9712bd3dee6 Mon Sep 17 00:00:00 2001 From: Michele Pangrazzi Date: Thu, 23 Jan 2025 15:59:53 +0100 Subject: [PATCH 14/23] Rewrote pipeline loading logic at startup --- src/hayhooks/cli/deploy_files/__init__.py | 25 +------ src/hayhooks/server/app.py | 91 +++++++++++++++++++---- src/hayhooks/server/utils/deploy_utils.py | 30 ++++++++ src/hayhooks/settings.py | 3 + 4 files changed, 112 insertions(+), 37 deletions(-) diff --git a/src/hayhooks/cli/deploy_files/__init__.py b/src/hayhooks/cli/deploy_files/__init__.py index 9ece750..2ffc448 100644 --- a/src/hayhooks/cli/deploy_files/__init__.py +++ b/src/hayhooks/cli/deploy_files/__init__.py @@ -2,20 +2,7 @@ import requests from pathlib import Path from urllib.parse import urljoin - - -def should_skip_file(file_path: Path) -> bool: - """ - Determine if a file should be skipped during deployment. - Handles hidden files across different operating systems. - """ - if file_path.is_dir(): - return True - - if file_path.name.startswith('.'): - return True - - return False +from hayhooks.server.utils.deploy_utils import read_pipeline_files_from_folder @click.command() @@ -28,15 +15,7 @@ def deploy_files(server_conf, name, folder): files_dict = {} try: - for file_path in folder.iterdir(): - if should_skip_file(file_path): - continue - - try: - files_dict[file_path.name] = file_path.read_text() - except Exception as e: - click.echo(f"Error reading file {file_path}: {str(e)}") - return + files_dict = read_pipeline_files_from_folder(folder) if not files_dict: click.echo("Error: No valid files found in the specified folder") diff --git a/src/hayhooks/server/app.py b/src/hayhooks/server/app.py index 372390e..638873e 100644 --- a/src/hayhooks/server/app.py +++ b/src/hayhooks/server/app.py @@ -1,16 +1,72 @@ -import logging -import os -import glob from fastapi import FastAPI from pathlib import Path -from hayhooks.server.utils.deploy_utils import deploy_pipeline_def, PipelineDefinition +from hayhooks.server.utils.deploy_utils import ( + deploy_pipeline_def, + PipelineDefinition, + deploy_pipeline_files, + read_pipeline_files_from_folder, +) from hayhooks.server.routers import status_router, draw_router, deploy_router, undeploy_router from hayhooks.settings import settings +from hayhooks.server.logger import log -logger = logging.getLogger("uvicorn.info") + +def deploy_yaml_pipeline(app: FastAPI, pipeline_file_path: Path) -> dict: + """ + Deploy a pipeline from a YAML file. + + Args: + app: FastAPI application instance + pipeline_file_path: Path to the YAML pipeline definition + + Returns: + dict: Deployment result containing pipeline name + """ + name = pipeline_file_path.stem + with open(pipeline_file_path, "r") as pipeline_file: + source_code = pipeline_file.read() + + pipeline_definition = PipelineDefinition(name=name, source_code=source_code) + deployed_pipeline = deploy_pipeline_def(app, pipeline_definition) + log.info(f"Deployed pipeline from yaml: {deployed_pipeline['name']}") + return deployed_pipeline + + +def deploy_files_pipeline(app: FastAPI, pipeline_dir: Path) -> dict: + """ + Deploy a pipeline from a directory containing multiple files. + + Args: + app: FastAPI application instance + pipeline_dir: Path to the pipeline directory + + Returns: + dict: Deployment result containing pipeline name + """ + name = pipeline_dir.name + files = read_pipeline_files_from_folder(pipeline_dir) + + if files: + deployed_pipeline = deploy_pipeline_files(app, name, files) + log.info(f"Deployed pipeline from directory: {deployed_pipeline['name']}") + return deployed_pipeline + return {"name": name} def create_app() -> FastAPI: + """ + Create and configure a FastAPI application. + + This function initializes a FastAPI application with the following features: + - Configures root path from settings if provided + - Includes all router endpoints (status, draw, deploy, undeploy) + - Auto-deploys pipelines from the configured pipelines directory: + - YAML pipeline definitions (*.yml, *.yaml) + - Pipeline folders containing multiple files + + Returns: + FastAPI: Configured FastAPI application instance + """ if root_path := settings.root_path: app = FastAPI(root_path=root_path) else: @@ -26,15 +82,22 @@ def create_app() -> FastAPI: pipelines_dir = settings.pipelines_dir if pipelines_dir: - logger.info(f"Pipelines dir set to: {pipelines_dir}") - for pipeline_file_path in glob.glob(f"{pipelines_dir}/*.y*ml"): - name = Path(pipeline_file_path).stem - with open(pipeline_file_path, "r") as pipeline_file: - source_code = pipeline_file.read() - - pipeline_defintion = PipelineDefinition(name=name, source_code=source_code) - deployed_pipeline = deploy_pipeline_def(app, pipeline_defintion) - logger.info(f"Deployed pipeline: {deployed_pipeline['name']}") + log.info(f"Pipelines dir set to: {pipelines_dir}") + pipelines_path = Path(pipelines_dir) + + yaml_files = list(pipelines_path.glob("*.y*ml")) + pipeline_dirs = [d for d in pipelines_path.iterdir() if d.is_dir()] + + if yaml_files: + log.info(f"Deploying {len(yaml_files)} pipeline(s) from YAML files") + for pipeline_file_path in yaml_files: + deploy_yaml_pipeline(app, pipeline_file_path) + + if pipeline_dirs: + log.info(f"Deploying {len(pipeline_dirs)} pipeline(s) from folders") + for pipeline_dir in pipeline_dirs: + deploy_files_pipeline(app, pipeline_dir) + return app diff --git a/src/hayhooks/server/utils/deploy_utils.py b/src/hayhooks/server/utils/deploy_utils.py index 80b273a..84f6061 100644 --- a/src/hayhooks/server/utils/deploy_utils.py +++ b/src/hayhooks/server/utils/deploy_utils.py @@ -285,3 +285,33 @@ async def chat_endpoint(chat_req: ChatRequest) -> JSONResponse: clog.success("Pipeline deployment complete") return {"name": pipeline_name} + + +def read_pipeline_files_from_folder(folder_path: Path) -> dict[str, str]: + """Read pipeline files from a folder and return a dictionary mapping filenames to their contents. + Skips directories, hidden files, and common Python artifacts. + + Args: + folder_path: Path to the folder containing the pipeline files + + Returns: + Dictionary mapping filenames to their contents + """ + + files = {} + for file_path in folder_path.rglob("*"): + # Skip directories and hidden files + if file_path.is_dir() or file_path.name.startswith('.'): + continue + + # Skip files matching ignore patterns + if any(file_path.match(pattern) for pattern in settings.files_to_ignore_patterns): + continue + + try: + files[str(file_path.relative_to(folder_path))] = file_path.read_text(encoding="utf-8", errors="ignore") + except Exception as e: + log.warning(f"Skipping file {file_path}: {str(e)}") + continue + + return files diff --git a/src/hayhooks/settings.py b/src/hayhooks/settings.py index c07664b..cde0036 100644 --- a/src/hayhooks/settings.py +++ b/src/hayhooks/settings.py @@ -22,6 +22,9 @@ class AppSettings(BaseSettings): # Port for the FastAPI app port: int = 1416 + # Files to ignore when reading pipeline files from a folder + files_to_ignore_patterns: list[str] = ["*.pyc", "*.pyo", "*.pyd", "__pycache__", "*.so", "*.egg", "*.egg-info"] + @field_validator("pipelines_dir") def validate_pipelines_dir(cls, v): path = Path(v) From d9916b6c86c97352ec99b6acb398a01952abf19f Mon Sep 17 00:00:00 2001 From: Michele Pangrazzi Date: Thu, 23 Jan 2025 17:09:55 +0100 Subject: [PATCH 15/23] Add tests for pipelines loading at app startup --- src/hayhooks/server/app.py | 12 +++++++-- tests/test_deploy_files_at_startup copy.py | 31 ++++++++++++++++++++++ tests/test_deploy_yaml_at_startup.py | 30 +++++++++++++++++++++ 3 files changed, 71 insertions(+), 2 deletions(-) create mode 100644 tests/test_deploy_files_at_startup copy.py create mode 100644 tests/test_deploy_yaml_at_startup.py diff --git a/src/hayhooks/server/app.py b/src/hayhooks/server/app.py index 638873e..44901b9 100644 --- a/src/hayhooks/server/app.py +++ b/src/hayhooks/server/app.py @@ -91,12 +91,20 @@ def create_app() -> FastAPI: if yaml_files: log.info(f"Deploying {len(yaml_files)} pipeline(s) from YAML files") for pipeline_file_path in yaml_files: - deploy_yaml_pipeline(app, pipeline_file_path) + try: + deploy_yaml_pipeline(app, pipeline_file_path) + except Exception as e: + log.warning(f"Skipping pipeline file {pipeline_file_path}: {str(e)}") + continue if pipeline_dirs: log.info(f"Deploying {len(pipeline_dirs)} pipeline(s) from folders") for pipeline_dir in pipeline_dirs: - deploy_files_pipeline(app, pipeline_dir) + try: + deploy_files_pipeline(app, pipeline_dir) + except Exception as e: + log.warning(f"Skipping pipeline folder {pipeline_dir}: {str(e)}") + continue return app diff --git a/tests/test_deploy_files_at_startup copy.py b/tests/test_deploy_files_at_startup copy.py new file mode 100644 index 0000000..2b6dfaa --- /dev/null +++ b/tests/test_deploy_files_at_startup copy.py @@ -0,0 +1,31 @@ +import pytest +from pathlib import Path +from fastapi.testclient import TestClient +from hayhooks.server.app import create_app +from hayhooks.settings import settings + + +@pytest.fixture +def test_pipelines_dir(): + return Path("tests/test_files/python") + + +@pytest.fixture +def app_with_pipelines(test_pipelines_dir, monkeypatch): + monkeypatch.setattr(settings, "pipelines_dir", str(test_pipelines_dir)) + app = create_app() + return app + + +@pytest.fixture +def test_client(app_with_pipelines): + return TestClient(app_with_pipelines) + + +def test_app_loads_pipeline_from_directory(test_client, test_pipelines_dir): + response = test_client.get("/status") + assert response.status_code == 200 + + pipelines = response.json()["pipelines"] + assert len(pipelines) == 1 # only one pipeline should be loaded + assert "chat_with_website" in pipelines diff --git a/tests/test_deploy_yaml_at_startup.py b/tests/test_deploy_yaml_at_startup.py new file mode 100644 index 0000000..6dbaef0 --- /dev/null +++ b/tests/test_deploy_yaml_at_startup.py @@ -0,0 +1,30 @@ +import pytest +from pathlib import Path +from fastapi.testclient import TestClient +from hayhooks.server.app import create_app +from hayhooks.settings import settings + + +@pytest.fixture +def test_pipelines_dir(): + return Path("tests/test_files/yaml/working_pipelines") + + +@pytest.fixture +def app_with_pipelines(test_pipelines_dir, monkeypatch): + monkeypatch.setattr(settings, "pipelines_dir", str(test_pipelines_dir)) + app = create_app() + return app + + +@pytest.fixture +def test_client(app_with_pipelines): + return TestClient(app_with_pipelines) + + +def test_app_loads_pipeline_from_directory(test_client, test_pipelines_dir): + response = test_client.get("/status") + assert response.status_code == 200 + + pipelines = response.json()["pipelines"] + assert len(pipelines) == len(list(test_pipelines_dir.rglob("*"))) From 08b45def95d6d01662586779e4077bed81307533 Mon Sep 17 00:00:00 2001 From: Michele Pangrazzi Date: Thu, 23 Jan 2025 18:10:22 +0100 Subject: [PATCH 16/23] Update logger format --- src/hayhooks/server/logger.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/hayhooks/server/logger.py b/src/hayhooks/server/logger.py index 5fd3738..8ce4ce6 100644 --- a/src/hayhooks/server/logger.py +++ b/src/hayhooks/server/logger.py @@ -5,7 +5,7 @@ def formatter(record): if record["extra"]: - return "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} | {message}\n{extra}\n" + return "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} | {message} - {extra}\n" return "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} | {message}\n" From 2078bfa941590cad1be83bc36a8bc681134ba688 Mon Sep 17 00:00:00 2001 From: Michele Pangrazzi Date: Fri, 24 Jan 2025 11:22:21 +0100 Subject: [PATCH 17/23] Fix filename --- ...y_files_at_startup copy.py => test_deploy_files_at_startup.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tests/{test_deploy_files_at_startup copy.py => test_deploy_files_at_startup.py} (100%) diff --git a/tests/test_deploy_files_at_startup copy.py b/tests/test_deploy_files_at_startup.py similarity index 100% rename from tests/test_deploy_files_at_startup copy.py rename to tests/test_deploy_files_at_startup.py From 5f6809de45a67559c19031079d906607914076d2 Mon Sep 17 00:00:00 2001 From: Michele Pangrazzi Date: Fri, 24 Jan 2025 15:07:30 +0100 Subject: [PATCH 18/23] Unified tests on pipeline deploy at startup ; add test for yaml + files pipeline deployment --- tests/test_deploy_at_startup.py | 88 +++++++++++++++++++ tests/test_deploy_files_at_startup.py | 31 ------- tests/test_deploy_utils.py | 10 +-- tests/test_deploy_yaml_at_startup.py | 30 ------- .../chat_with_website/chat_with_website.yml | 0 .../chat_with_website/pipeline_wrapper.py | 0 .../no_wrapper/chat_with_website.yml | 0 tests/test_files/mixed/basic_rag_pipeline.yml | 72 +++++++++++++++ .../chat_with_website/chat_with_website.yml | 50 +++++++++++ .../chat_with_website/pipeline_wrapper.py | 27 ++++++ tests/test_it_deploy_files.py | 2 +- 11 files changed, 243 insertions(+), 67 deletions(-) create mode 100644 tests/test_deploy_at_startup.py delete mode 100644 tests/test_deploy_files_at_startup.py delete mode 100644 tests/test_deploy_yaml_at_startup.py rename tests/test_files/{python => files}/chat_with_website/chat_with_website.yml (100%) rename tests/test_files/{python => files}/chat_with_website/pipeline_wrapper.py (100%) rename tests/test_files/{python => files}/no_wrapper/chat_with_website.yml (100%) create mode 100644 tests/test_files/mixed/basic_rag_pipeline.yml create mode 100644 tests/test_files/mixed/chat_with_website/chat_with_website.yml create mode 100644 tests/test_files/mixed/chat_with_website/pipeline_wrapper.py diff --git a/tests/test_deploy_at_startup.py b/tests/test_deploy_at_startup.py new file mode 100644 index 0000000..aaadbea --- /dev/null +++ b/tests/test_deploy_at_startup.py @@ -0,0 +1,88 @@ +import pytest +from pathlib import Path +from fastapi.testclient import TestClient +from hayhooks.server.app import create_app +from hayhooks.settings import settings +from hayhooks.server.pipelines.registry import registry + + +@pytest.fixture(autouse=True) +def clear_registry(): + registry.clear() + yield + + +@pytest.fixture +def test_files_pipelines_dir(): + return Path("tests/test_files/files") + + +@pytest.fixture +def test_yaml_pipelines_dir(): + return Path("tests/test_files/yaml/working_pipelines") + + +@pytest.fixture +def test_mixed_pipelines_dir(): + return Path("tests/test_files/mixed") + + +@pytest.fixture +def app_with_files_pipelines(test_files_pipelines_dir, monkeypatch): + monkeypatch.setattr(settings, "pipelines_dir", str(test_files_pipelines_dir)) + app = create_app() + return app + + +@pytest.fixture +def app_with_yaml_pipelines(test_yaml_pipelines_dir, monkeypatch): + monkeypatch.setattr(settings, "pipelines_dir", str(test_yaml_pipelines_dir)) + app = create_app() + return app + + +@pytest.fixture +def app_with_mixed_pipelines(test_mixed_pipelines_dir, monkeypatch): + monkeypatch.setattr(settings, "pipelines_dir", str(test_mixed_pipelines_dir)) + app = create_app() + return app + + +@pytest.fixture +def test_client_files(app_with_files_pipelines): + return TestClient(app_with_files_pipelines) + + +@pytest.fixture +def test_client_yaml(app_with_yaml_pipelines): + return TestClient(app_with_yaml_pipelines) + + +@pytest.fixture +def test_client_mixed(app_with_mixed_pipelines): + return TestClient(app_with_mixed_pipelines) + + +def test_app_loads_pipeline_from_files_directory(test_client_files, test_files_pipelines_dir): + response = test_client_files.get("/status") + assert response.status_code == 200 + + pipelines = response.json()["pipelines"] + assert len(pipelines) == 1 # only one pipeline should be loaded + assert "chat_with_website" in pipelines + + +def test_app_loads_pipeline_from_yaml_directory(test_client_yaml, test_yaml_pipelines_dir): + response = test_client_yaml.get("/status") + assert response.status_code == 200 + + pipelines = response.json()["pipelines"] + assert len(pipelines) == len(list(test_yaml_pipelines_dir.rglob("*"))) + + +def test_app_loads_pipeline_from_mixed_directory(test_client_mixed, test_mixed_pipelines_dir): + response = test_client_mixed.get("/status") + assert response.status_code == 200 + + pipelines = response.json()["pipelines"] + assert len(pipelines) == 2 # 1 file, 1 yaml diff --git a/tests/test_deploy_files_at_startup.py b/tests/test_deploy_files_at_startup.py deleted file mode 100644 index 2b6dfaa..0000000 --- a/tests/test_deploy_files_at_startup.py +++ /dev/null @@ -1,31 +0,0 @@ -import pytest -from pathlib import Path -from fastapi.testclient import TestClient -from hayhooks.server.app import create_app -from hayhooks.settings import settings - - -@pytest.fixture -def test_pipelines_dir(): - return Path("tests/test_files/python") - - -@pytest.fixture -def app_with_pipelines(test_pipelines_dir, monkeypatch): - monkeypatch.setattr(settings, "pipelines_dir", str(test_pipelines_dir)) - app = create_app() - return app - - -@pytest.fixture -def test_client(app_with_pipelines): - return TestClient(app_with_pipelines) - - -def test_app_loads_pipeline_from_directory(test_client, test_pipelines_dir): - response = test_client.get("/status") - assert response.status_code == 200 - - pipelines = response.json()["pipelines"] - assert len(pipelines) == 1 # only one pipeline should be loaded - assert "chat_with_website" in pipelines diff --git a/tests/test_deploy_utils.py b/tests/test_deploy_utils.py index 30ed299..b1b7ed5 100644 --- a/tests/test_deploy_utils.py +++ b/tests/test_deploy_utils.py @@ -26,7 +26,7 @@ def cleanup_test_pipelines(): def test_load_pipeline_module(): pipeline_name = "chat_with_website" - pipeline_folder_path = Path("tests/test_files/python/chat_with_website") + pipeline_folder_path = Path("tests/test_files/files/chat_with_website") module = load_pipeline_module(pipeline_name, pipeline_folder_path) @@ -39,22 +39,22 @@ def test_load_pipeline_module(): def test_load_pipeline_wrong_folder(): pipeline_name = "chat_with_website" - pipeline_folder_path = Path("tests/test_files/python/wrong_folder") + pipeline_folder_path = Path("tests/test_files/files/wrong_folder") with pytest.raises( PipelineModuleLoadError, - match="Required file 'tests/test_files/python/wrong_folder/pipeline_wrapper.py' not found", + match="Required file 'tests/test_files/files/wrong_folder/pipeline_wrapper.py' not found", ): load_pipeline_module(pipeline_name, pipeline_folder_path) def test_load_pipeline_no_wrapper(): pipeline_name = "chat_with_website" - pipeline_folder_path = Path("tests/test_files/python/no_wrapper") + pipeline_folder_path = Path("tests/test_files/files/no_wrapper") with pytest.raises( PipelineModuleLoadError, - match="Required file 'tests/test_files/python/no_wrapper/pipeline_wrapper.py' not found", + match="Required file 'tests/test_files/files/no_wrapper/pipeline_wrapper.py' not found", ): load_pipeline_module(pipeline_name, pipeline_folder_path) diff --git a/tests/test_deploy_yaml_at_startup.py b/tests/test_deploy_yaml_at_startup.py deleted file mode 100644 index 6dbaef0..0000000 --- a/tests/test_deploy_yaml_at_startup.py +++ /dev/null @@ -1,30 +0,0 @@ -import pytest -from pathlib import Path -from fastapi.testclient import TestClient -from hayhooks.server.app import create_app -from hayhooks.settings import settings - - -@pytest.fixture -def test_pipelines_dir(): - return Path("tests/test_files/yaml/working_pipelines") - - -@pytest.fixture -def app_with_pipelines(test_pipelines_dir, monkeypatch): - monkeypatch.setattr(settings, "pipelines_dir", str(test_pipelines_dir)) - app = create_app() - return app - - -@pytest.fixture -def test_client(app_with_pipelines): - return TestClient(app_with_pipelines) - - -def test_app_loads_pipeline_from_directory(test_client, test_pipelines_dir): - response = test_client.get("/status") - assert response.status_code == 200 - - pipelines = response.json()["pipelines"] - assert len(pipelines) == len(list(test_pipelines_dir.rglob("*"))) diff --git a/tests/test_files/python/chat_with_website/chat_with_website.yml b/tests/test_files/files/chat_with_website/chat_with_website.yml similarity index 100% rename from tests/test_files/python/chat_with_website/chat_with_website.yml rename to tests/test_files/files/chat_with_website/chat_with_website.yml diff --git a/tests/test_files/python/chat_with_website/pipeline_wrapper.py b/tests/test_files/files/chat_with_website/pipeline_wrapper.py similarity index 100% rename from tests/test_files/python/chat_with_website/pipeline_wrapper.py rename to tests/test_files/files/chat_with_website/pipeline_wrapper.py diff --git a/tests/test_files/python/no_wrapper/chat_with_website.yml b/tests/test_files/files/no_wrapper/chat_with_website.yml similarity index 100% rename from tests/test_files/python/no_wrapper/chat_with_website.yml rename to tests/test_files/files/no_wrapper/chat_with_website.yml diff --git a/tests/test_files/mixed/basic_rag_pipeline.yml b/tests/test_files/mixed/basic_rag_pipeline.yml new file mode 100644 index 0000000..472b377 --- /dev/null +++ b/tests/test_files/mixed/basic_rag_pipeline.yml @@ -0,0 +1,72 @@ +components: + llm: + init_parameters: + api_base_url: null + api_key: + env_vars: + - OPENAI_API_KEY + strict: true + type: env_var + generation_kwargs: {} + model: gpt-4o-mini + organization: null + streaming_callback: null + system_prompt: null + type: haystack.components.generators.openai.OpenAIGenerator + prompt_builder: + init_parameters: + required_variables: null + template: "\nGiven the following information, answer the question.\n\nContext:\n\ + {% for document in documents %}\n {{ document.content }}\n{% endfor %}\n\ + \nQuestion: {{question}}\nAnswer:\n" + variables: null + type: haystack.components.builders.prompt_builder.PromptBuilder + retriever: + init_parameters: + document_store: + init_parameters: + bm25_algorithm: BM25L + bm25_parameters: {} + bm25_tokenization_regex: (?u)\b\w\w+\b + embedding_similarity_function: dot_product + index: d8b1f58f-20e9-4a57-a84d-a44fc651de4e + type: haystack.document_stores.in_memory.document_store.InMemoryDocumentStore + filter_policy: replace + filters: null + return_embedding: false + scale_score: false + top_k: 10 + type: haystack.components.retrievers.in_memory.embedding_retriever.InMemoryEmbeddingRetriever + text_embedder: + init_parameters: + batch_size: 32 + config_kwargs: null + device: + device: mps + type: single + model: sentence-transformers/all-MiniLM-L6-v2 + model_kwargs: null + normalize_embeddings: false + precision: float32 + prefix: '' + progress_bar: true + suffix: '' + token: + env_vars: + - HF_API_TOKEN + - HF_TOKEN + strict: false + type: env_var + tokenizer_kwargs: null + truncate_dim: null + trust_remote_code: false + type: haystack.components.embedders.sentence_transformers_text_embedder.SentenceTransformersTextEmbedder +connections: +- receiver: retriever.query_embedding + sender: text_embedder.embedding +- receiver: prompt_builder.documents + sender: retriever.documents +- receiver: llm.prompt + sender: prompt_builder.prompt +max_runs_per_component: 100 +metadata: {} diff --git a/tests/test_files/mixed/chat_with_website/chat_with_website.yml b/tests/test_files/mixed/chat_with_website/chat_with_website.yml new file mode 100644 index 0000000..db4063f --- /dev/null +++ b/tests/test_files/mixed/chat_with_website/chat_with_website.yml @@ -0,0 +1,50 @@ +components: + converter: + type: haystack.components.converters.html.HTMLToDocument + init_parameters: + extraction_kwargs: null + + fetcher: + init_parameters: + raise_on_failure: true + retry_attempts: 2 + timeout: 3 + user_agents: + - haystack/LinkContentFetcher/2.0.0b8 + type: haystack.components.fetchers.link_content.LinkContentFetcher + + llm: + init_parameters: + api_base_url: null + api_key: + env_vars: + - OPENAI_API_KEY + strict: true + type: env_var + generation_kwargs: {} + model: gpt-4o-mini + streaming_callback: null + system_prompt: null + type: haystack.components.generators.openai.OpenAIGenerator + + prompt: + init_parameters: + template: | + "According to the contents of this website: + {% for document in documents %} + {{document.content}} + {% endfor %} + Answer the given question: {{query}} + Answer: + " + type: haystack.components.builders.prompt_builder.PromptBuilder + +connections: + - receiver: converter.sources + sender: fetcher.streams + - receiver: prompt.documents + sender: converter.documents + - receiver: llm.prompt + sender: prompt.prompt + +metadata: {} diff --git a/tests/test_files/mixed/chat_with_website/pipeline_wrapper.py b/tests/test_files/mixed/chat_with_website/pipeline_wrapper.py new file mode 100644 index 0000000..43ed503 --- /dev/null +++ b/tests/test_files/mixed/chat_with_website/pipeline_wrapper.py @@ -0,0 +1,27 @@ +from pathlib import Path +from typing import List +from haystack import Pipeline +from hayhooks.server.utils.base_pipeline_wrapper import BasePipelineWrapper +from hayhooks.server.logger import log + + +URLS = ["https://haystack.deepset.ai", "https://www.redis.io"] + + +class PipelineWrapper(BasePipelineWrapper): + def setup(self) -> None: + pipeline_yaml = (Path(__file__).parent / "chat_with_website.yml").read_text() + self.pipeline = Pipeline.loads(pipeline_yaml) + + def run_api(self, urls: List[str], question: str) -> str: + log.trace(f"Running pipeline with urls: {urls} and question: {question}") + result = self.pipeline.run({"fetcher": {"urls": urls}, "prompt": {"query": question}}) + return result["llm"]["replies"][0] + + def run_chat(self, user_message: str, model_id: str, messages: List[dict], body: dict) -> str: + log.trace( + f"Running pipeline with user_message: {user_message}, model_id: {model_id}, messages: {messages}, body: {body}" + ) + question = user_message + result = self.pipeline.run({"fetcher": {"urls": URLS}, "prompt": {"query": question}}) + return result["llm"]["replies"][0] diff --git a/tests/test_it_deploy_files.py b/tests/test_it_deploy_files.py index 146c004..8405eec 100644 --- a/tests/test_it_deploy_files.py +++ b/tests/test_it_deploy_files.py @@ -28,7 +28,7 @@ def final_cleanup(): # Load test pipeline files from test directory -TEST_FILES_DIR = Path(__file__).parent / "test_files/python/chat_with_website" +TEST_FILES_DIR = Path(__file__).parent / "test_files/files/chat_with_website" PIPELINE_FILES = { "pipeline_wrapper.py": (TEST_FILES_DIR / "pipeline_wrapper.py").read_text(), "chat_with_website.yml": (TEST_FILES_DIR / "chat_with_website.yml").read_text(), From 32c02662e4ecc24b605be0021d2c3b8c27cb6b15 Mon Sep 17 00:00:00 2001 From: Michele Pangrazzi Date: Fri, 24 Jan 2025 17:58:48 +0100 Subject: [PATCH 19/23] Fix run_api args ; Update docstrings --- .../server/utils/base_pipeline_wrapper.py | 37 ++++++++++--------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/src/hayhooks/server/utils/base_pipeline_wrapper.py b/src/hayhooks/server/utils/base_pipeline_wrapper.py index 87a3b75..85d6525 100644 --- a/src/hayhooks/server/utils/base_pipeline_wrapper.py +++ b/src/hayhooks/server/utils/base_pipeline_wrapper.py @@ -9,38 +9,39 @@ def __init__(self): @abstractmethod def setup(self) -> None: """ - Setup the pipeline. + Initialize and configure the pipeline. - This method should be called before using the pipeline. + This method will be called before any pipeline operations. + It should initialize `self.pipeline` with the appropriate pipeline. + + Pipelines can be loaded from YAML or provided directly as code. """ pass @abstractmethod - def run_api(self, urls: List[str], question: str) -> dict: + def run_api(self): """ - Run the pipeline in API mode. + Execute the pipeline in API mode. - Args: - urls: List of URLs to fetch content from - question: Question to be answered + This method provides a generic interface for running the pipeline + with implementation-specific parameters. - Returns: - dict: Pipeline execution results + An API endpoint will call this method and will use dynamically created + pydantic models for request and response validation. """ pass @abstractmethod - def run_chat(self, user_message: str, model_id: str, messages: List[dict], body: dict) -> dict: + def run_chat(self, model_id: str, messages: List[dict], body: dict): """ - Run the pipeline in chat mode. + This method is called when a user sends an OpenAI-compatible chat completion request. - Args: - user_message: Message from the user - model_id: ID of the model to use - messages: List of previous messages - body: Additional request body parameters + This method handles conversational interactions with the pipeline, + maintaining context and processing chat-specific parameters. - Returns: - dict: Pipeline execution results + Args: + model_id: The model (Haystack pipeline) to run + messages: List of previous conversation messages for context + body: Additional parameters and configuration options """ pass From 5986cfa4a2d9bef84d1cb69853c34574c2aea6e2 Mon Sep 17 00:00:00 2001 From: Michele Pangrazzi Date: Mon, 27 Jan 2025 11:29:49 +0100 Subject: [PATCH 20/23] At least one between run_chat and run_api should be implemented ; Added checks ; Update tests --- .../server/utils/base_pipeline_wrapper.py | 6 +- src/hayhooks/server/utils/deploy_utils.py | 108 +++++++++++------- tests/test_deploy_utils.py | 62 ++++++++++ .../files/missing_methods/pipeline_wrapper.py | 6 + .../files/setup_error/pipeline_wrapper.py | 8 ++ tests/test_it_deploy_files.py | 50 ++++++-- 6 files changed, 183 insertions(+), 57 deletions(-) create mode 100644 tests/test_files/files/missing_methods/pipeline_wrapper.py create mode 100644 tests/test_files/files/setup_error/pipeline_wrapper.py diff --git a/src/hayhooks/server/utils/base_pipeline_wrapper.py b/src/hayhooks/server/utils/base_pipeline_wrapper.py index 85d6525..b597bf3 100644 --- a/src/hayhooks/server/utils/base_pipeline_wrapper.py +++ b/src/hayhooks/server/utils/base_pipeline_wrapper.py @@ -18,7 +18,6 @@ def setup(self) -> None: """ pass - @abstractmethod def run_api(self): """ Execute the pipeline in API mode. @@ -29,9 +28,8 @@ def run_api(self): An API endpoint will call this method and will use dynamically created pydantic models for request and response validation. """ - pass + raise NotImplementedError("run_api not implemented") - @abstractmethod def run_chat(self, model_id: str, messages: List[dict], body: dict): """ This method is called when a user sends an OpenAI-compatible chat completion request. @@ -44,4 +42,4 @@ def run_chat(self, model_id: str, messages: List[dict], body: dict): messages: List of previous conversation messages for context body: Additional parameters and configuration options """ - pass + raise NotImplementedError("run_chat not implemented") diff --git a/src/hayhooks/server/utils/deploy_utils.py b/src/hayhooks/server/utils/deploy_utils.py index 84f6061..0e2089a 100644 --- a/src/hayhooks/server/utils/deploy_utils.py +++ b/src/hayhooks/server/utils/deploy_utils.py @@ -21,6 +21,7 @@ get_response_model, ) from hayhooks.server.logger import log +from hayhooks.server.utils.base_pipeline_wrapper import BasePipelineWrapper from hayhooks.settings import settings from pydantic import BaseModel, create_model @@ -229,7 +230,7 @@ def deploy_pipeline_files(app: FastAPI, pipeline_name: str, files: dict[str, str module = load_pipeline_module(pipeline_name, folder_path=pipeline_dir) clog.debug("Creating PipelineWrapper instance") - pipeline_wrapper = module.PipelineWrapper() + pipeline_wrapper = create_pipeline_wrapper_instance(module) clog.debug("Running setup()") pipeline_wrapper.setup() @@ -237,46 +238,48 @@ def deploy_pipeline_files(app: FastAPI, pipeline_name: str, files: dict[str, str clog.debug("Adding pipeline to registry") registry.add(pipeline_name, pipeline_wrapper) - clog.debug("Creating dynamic Pydantic models for run_api") - RunRequest = create_request_model_from_callable(pipeline_wrapper.run_api, f'{pipeline_name}Run') - RunResponse = create_response_model_from_callable(pipeline_wrapper.run_api, f'{pipeline_name}Run') - - clog.debug("Adding new API endpoints") - - @handle_pipeline_exceptions() - async def run_endpoint(run_req: RunRequest) -> JSONResponse: # type: ignore - result = await run_in_threadpool(pipeline_wrapper.run_api, urls=run_req.urls, question=run_req.question) - return JSONResponse({"result": result}, status_code=200) - - @handle_pipeline_exceptions() - async def chat_endpoint(chat_req: ChatRequest) -> JSONResponse: - result = await run_in_threadpool( - pipeline_wrapper.run_chat, - user_message=chat_req.user_message, - model_id=chat_req.model_id, - messages=chat_req.messages, - body=chat_req.body, - ) - return JSONResponse({"result": result}, status_code=200) + if pipeline_wrapper._has_run_api: + clog.debug("Creating dynamic Pydantic models for run_api") - # Add routes - app.add_api_route( - path=f"/{pipeline_name}/run", - endpoint=run_endpoint, - methods=["POST"], - name=f"{pipeline_name}_run", - response_model=RunResponse, - tags=["pipelines"], - ) + RunRequest = create_request_model_from_callable(pipeline_wrapper.run_api, f'{pipeline_name}Run') + RunResponse = create_response_model_from_callable(pipeline_wrapper.run_api, f'{pipeline_name}Run') - app.add_api_route( - path=f"/{pipeline_name}/chat", - endpoint=chat_endpoint, - methods=["POST"], - name=f"{pipeline_name}_chat", - response_model=ChatResponse, - tags=["pipelines"], - ) + @handle_pipeline_exceptions() + async def run_endpoint(run_req: RunRequest) -> JSONResponse: # type: ignore + result = await run_in_threadpool(pipeline_wrapper.run_api, urls=run_req.urls, question=run_req.question) + return JSONResponse({"result": result}, status_code=200) + + app.add_api_route( + path=f"/{pipeline_name}/run", + endpoint=run_endpoint, + methods=["POST"], + name=f"{pipeline_name}_run", + response_model=RunResponse, + tags=["pipelines"], + ) + + if pipeline_wrapper._has_run_chat: + clog.debug("Creating dynamic Pydantic models for run_chat") + + @handle_pipeline_exceptions() + async def chat_endpoint(chat_req: ChatRequest) -> JSONResponse: + result = await run_in_threadpool( + pipeline_wrapper.run_chat, + user_message=chat_req.user_message, + model_id=chat_req.model_id, + messages=chat_req.messages, + body=chat_req.body, + ) + return JSONResponse({"result": result}, status_code=200) + + app.add_api_route( + path=f"/{pipeline_name}/chat", + endpoint=chat_endpoint, + methods=["POST"], + name=f"{pipeline_name}_chat", + response_model=ChatResponse, + tags=["pipelines"], + ) clog.debug("Setting up FastAPI app") app.openapi_schema = None @@ -287,6 +290,31 @@ async def chat_endpoint(chat_req: ChatRequest) -> JSONResponse: return {"name": pipeline_name} +def create_pipeline_wrapper_instance(pipeline_module: ModuleType) -> BasePipelineWrapper: + try: + pipeline_wrapper = pipeline_module.PipelineWrapper() + except Exception as e: + raise PipelineWrapperError(f"Failed to create pipeline wrapper instance: {str(e)}") from e + + try: + pipeline_wrapper.setup() + except Exception as e: + raise PipelineWrapperError(f"Failed to call setup() on pipeline wrapper instance: {str(e)}") from e + + has_run_api = pipeline_wrapper.run_api.__func__ is not BasePipelineWrapper.run_api + if has_run_api: + pipeline_wrapper._has_run_api = True + + has_run_chat = pipeline_wrapper.run_chat.__func__ is not BasePipelineWrapper.run_chat + if has_run_chat: + pipeline_wrapper._has_run_chat = True + + if not (has_run_api or has_run_chat): + raise PipelineWrapperError("At least one of run_api or run_chat must be implemented") + + return pipeline_wrapper + + def read_pipeline_files_from_folder(folder_path: Path) -> dict[str, str]: """Read pipeline files from a folder and return a dictionary mapping filenames to their contents. Skips directories, hidden files, and common Python artifacts. @@ -300,11 +328,9 @@ def read_pipeline_files_from_folder(folder_path: Path) -> dict[str, str]: files = {} for file_path in folder_path.rglob("*"): - # Skip directories and hidden files if file_path.is_dir() or file_path.name.startswith('.'): continue - # Skip files matching ignore patterns if any(file_path.match(pattern) for pattern in settings.files_to_ignore_patterns): continue diff --git a/tests/test_deploy_utils.py b/tests/test_deploy_utils.py index b1b7ed5..ca30512 100644 --- a/tests/test_deploy_utils.py +++ b/tests/test_deploy_utils.py @@ -1,5 +1,6 @@ import pytest import shutil +from haystack import Pipeline from pathlib import Path from typing import Callable from hayhooks.server.utils.deploy_utils import ( @@ -7,11 +8,14 @@ save_pipeline_files, create_request_model_from_callable, create_response_model_from_callable, + create_pipeline_wrapper_instance, ) from hayhooks.server.exceptions import ( PipelineFilesError, PipelineModuleLoadError, + PipelineWrapperError, ) +from hayhooks.server.utils.base_pipeline_wrapper import BasePipelineWrapper TEST_PIPELINES_DIR = Path("tests/test_files/test_pipelines") @@ -122,3 +126,61 @@ def sample_func() -> dict: assert model.__name__ == "TestResponse" assert model.model_fields["result"].annotation == dict assert model.model_fields["result"].is_required + + +def test_create_pipeline_wrapper_instance_success(): + class ValidPipelineWrapper(BasePipelineWrapper): + def setup(self): + self.pipeline = Pipeline() + + def run_api(self): + pass + + def run_chat(self, model_id, messages, body): + pass + + module = type('Module', (), {'PipelineWrapper': ValidPipelineWrapper}) + + wrapper = create_pipeline_wrapper_instance(module) + assert isinstance(wrapper, BasePipelineWrapper) + assert hasattr(wrapper, 'run_api') + assert hasattr(wrapper, 'run_chat') + assert isinstance(wrapper.pipeline, Pipeline) + + +def test_create_pipeline_wrapper_instance_init_error(): + class BrokenPipelineWrapper: + def __init__(self): + raise ValueError("Init error") + + module = type('Module', (), {'PipelineWrapper': BrokenPipelineWrapper}) + + with pytest.raises(PipelineWrapperError, match="Failed to create pipeline wrapper instance: Init error"): + create_pipeline_wrapper_instance(module) + + +def test_create_pipeline_wrapper_instance_setup_error(): + class BrokenSetupWrapper(BasePipelineWrapper): + def setup(self): + raise ValueError("Setup error") + + def run_api(self): + pass + + module = type('Module', (), {'PipelineWrapper': BrokenSetupWrapper}) + + with pytest.raises( + PipelineWrapperError, match="Failed to call setup\\(\\) on pipeline wrapper instance: Setup error" + ): + create_pipeline_wrapper_instance(module) + + +def test_create_pipeline_wrapper_instance_missing_methods(): + class IncompleteWrapper(BasePipelineWrapper): + def setup(self): + self.pipeline = Pipeline() + + module = type('Module', (), {'PipelineWrapper': IncompleteWrapper}) + + with pytest.raises(PipelineWrapperError, match="At least one of run_api or run_chat must be implemented"): + create_pipeline_wrapper_instance(module) diff --git a/tests/test_files/files/missing_methods/pipeline_wrapper.py b/tests/test_files/files/missing_methods/pipeline_wrapper.py new file mode 100644 index 0000000..c9ddcc1 --- /dev/null +++ b/tests/test_files/files/missing_methods/pipeline_wrapper.py @@ -0,0 +1,6 @@ +from hayhooks.server.utils.base_pipeline_wrapper import BasePipelineWrapper +from haystack import Pipeline + +class PipelineWrapper(BasePipelineWrapper): + def setup(self): + self.pipeline = Pipeline() diff --git a/tests/test_files/files/setup_error/pipeline_wrapper.py b/tests/test_files/files/setup_error/pipeline_wrapper.py new file mode 100644 index 0000000..f0cb2b0 --- /dev/null +++ b/tests/test_files/files/setup_error/pipeline_wrapper.py @@ -0,0 +1,8 @@ +from hayhooks.server.utils.base_pipeline_wrapper import BasePipelineWrapper + +class PipelineWrapper(BasePipelineWrapper): + def setup(self): + raise ValueError("Setup failed!") + + def run_api(self): + return {"result": "This should never be reached"} diff --git a/tests/test_it_deploy_files.py b/tests/test_it_deploy_files.py index 8405eec..c1d9386 100644 --- a/tests/test_it_deploy_files.py +++ b/tests/test_it_deploy_files.py @@ -1,10 +1,10 @@ import pytest +import shutil from fastapi.testclient import TestClient from hayhooks.server import app from pathlib import Path from hayhooks.server.pipelines.registry import registry from hayhooks.settings import settings -import shutil client = TestClient(app) @@ -27,16 +27,19 @@ def final_cleanup(): cleanup() -# Load test pipeline files from test directory -TEST_FILES_DIR = Path(__file__).parent / "test_files/files/chat_with_website" -PIPELINE_FILES = { - "pipeline_wrapper.py": (TEST_FILES_DIR / "pipeline_wrapper.py").read_text(), - "chat_with_website.yml": (TEST_FILES_DIR / "chat_with_website.yml").read_text(), +TEST_FILES_DIR = Path(__file__).parent / "test_files/files" +VALID_PIPELINE_DIR = TEST_FILES_DIR / "chat_with_website" +MISSING_METHODS_DIR = TEST_FILES_DIR / "missing_methods" +SETUP_ERROR_DIR = TEST_FILES_DIR / "setup_error" + +SAMPLE_PIPELINE_FILES = { + "pipeline_wrapper.py": (VALID_PIPELINE_DIR / "pipeline_wrapper.py").read_text(), + "chat_with_website.yml": (VALID_PIPELINE_DIR / "chat_with_website.yml").read_text(), } def test_deploy_files_ok(status_pipeline): - pipeline_data = {"name": "test_pipeline", "files": PIPELINE_FILES} + pipeline_data = {"name": "test_pipeline", "files": SAMPLE_PIPELINE_FILES} response = client.post("/deploy_files", json=pipeline_data) assert response.status_code == 200 @@ -53,7 +56,7 @@ def test_deploy_files_ok(status_pipeline): def test_deploy_files_missing_wrapper(): - pipeline_data = {"name": "test_pipeline", "files": PIPELINE_FILES.copy()} + pipeline_data = {"name": "test_pipeline", "files": SAMPLE_PIPELINE_FILES.copy()} pipeline_data["files"].pop("pipeline_wrapper.py") response = client.post("/deploy_files", json=pipeline_data) @@ -64,7 +67,7 @@ def test_deploy_files_missing_wrapper(): def test_deploy_files_invalid_wrapper(): invalid_files = { "pipeline_wrapper.py": "invalid python code", - "chat_with_website.yml": PIPELINE_FILES["chat_with_website.yml"], + "chat_with_website.yml": SAMPLE_PIPELINE_FILES["chat_with_website.yml"], } response = client.post("/deploy_files", json={"name": "test_pipeline", "files": invalid_files}) @@ -73,16 +76,16 @@ def test_deploy_files_invalid_wrapper(): def test_deploy_files_duplicate_pipeline(): - response = client.post("/deploy_files", json={"name": "test_pipeline", "files": PIPELINE_FILES}) + response = client.post("/deploy_files", json={"name": "test_pipeline", "files": SAMPLE_PIPELINE_FILES}) assert response.status_code == 200 - response = client.post("/deploy_files", json={"name": "test_pipeline", "files": PIPELINE_FILES}) + response = client.post("/deploy_files", json={"name": "test_pipeline", "files": SAMPLE_PIPELINE_FILES}) assert response.status_code == 409 assert "Pipeline 'test_pipeline' already exists" in response.json()["detail"] def test_pipeline_endpoint_error_handling(): - pipeline_data = {"name": "test_pipeline", "files": PIPELINE_FILES} + pipeline_data = {"name": "test_pipeline", "files": SAMPLE_PIPELINE_FILES} response = client.post("/deploy_files", json=pipeline_data) assert response.status_code == 200 @@ -93,3 +96,26 @@ def test_pipeline_endpoint_error_handling(): ) assert run_response.status_code == 500 assert "Pipeline execution failed" in run_response.json()["detail"] + + +def test_deploy_files_missing_required_methods(): + invalid_files = { + "pipeline_wrapper.py": (MISSING_METHODS_DIR / "pipeline_wrapper.py").read_text(), + "chat_with_website.yml": SAMPLE_PIPELINE_FILES["chat_with_website.yml"], + } + + response = client.post("/deploy_files", json={"name": "test_pipeline", "files": invalid_files}) + print(response.json()) + assert response.status_code == 422 + assert "At least one of run_api or run_chat must be implemented" in response.json()["detail"] + + +def test_deploy_files_setup_error(): + invalid_files = { + "pipeline_wrapper.py": (SETUP_ERROR_DIR / "pipeline_wrapper.py").read_text(), + "chat_with_website.yml": SAMPLE_PIPELINE_FILES["chat_with_website.yml"], + } + + response = client.post("/deploy_files", json={"name": "test_pipeline", "files": invalid_files}) + assert response.status_code == 422 + assert "Failed to call setup() on pipeline wrapper instance: Setup failed!" in response.json()["detail"] From 49af0628b168975c3c265d88361e93ee15cdb18d Mon Sep 17 00:00:00 2001 From: Michele Pangrazzi Date: Mon, 27 Jan 2025 14:00:41 +0100 Subject: [PATCH 21/23] Updated docstrings --- src/hayhooks/server/utils/base_pipeline_wrapper.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/hayhooks/server/utils/base_pipeline_wrapper.py b/src/hayhooks/server/utils/base_pipeline_wrapper.py index b597bf3..1608fad 100644 --- a/src/hayhooks/server/utils/base_pipeline_wrapper.py +++ b/src/hayhooks/server/utils/base_pipeline_wrapper.py @@ -25,8 +25,10 @@ def run_api(self): This method provides a generic interface for running the pipeline with implementation-specific parameters. - An API endpoint will call this method and will use dynamically created - pydantic models for request and response validation. + This method will be used as the handler for the `/run` API endpoint. + + Pydantic models will be automatically generated based on this method's + signature and return type for request validation and response serialization. """ raise NotImplementedError("run_api not implemented") @@ -37,9 +39,11 @@ def run_chat(self, model_id: str, messages: List[dict], body: dict): This method handles conversational interactions with the pipeline, maintaining context and processing chat-specific parameters. + This method will be used as the handler for the `/chat` API endpoint. + Args: - model_id: The model (Haystack pipeline) to run - messages: List of previous conversation messages for context + model_id: The `name` of the deployed Haystack pipeline to run + messages: The history of messages as OpenAI-compatible list of dicts body: Additional parameters and configuration options """ raise NotImplementedError("run_chat not implemented") From da427f20c864c7c76f68ebb9172361968349180f Mon Sep 17 00:00:00 2001 From: Michele Pangrazzi Date: Mon, 27 Jan 2025 15:44:15 +0100 Subject: [PATCH 22/23] Refactoring --- src/hayhooks/server/utils/deploy_utils.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/src/hayhooks/server/utils/deploy_utils.py b/src/hayhooks/server/utils/deploy_utils.py index 0e2089a..cb05b42 100644 --- a/src/hayhooks/server/utils/deploy_utils.py +++ b/src/hayhooks/server/utils/deploy_utils.py @@ -238,7 +238,7 @@ def deploy_pipeline_files(app: FastAPI, pipeline_name: str, files: dict[str, str clog.debug("Adding pipeline to registry") registry.add(pipeline_name, pipeline_wrapper) - if pipeline_wrapper._has_run_api: + if pipeline_wrapper._is_run_api_implemented: clog.debug("Creating dynamic Pydantic models for run_api") RunRequest = create_request_model_from_callable(pipeline_wrapper.run_api, f'{pipeline_name}Run') @@ -258,7 +258,7 @@ async def run_endpoint(run_req: RunRequest) -> JSONResponse: # type: ignore tags=["pipelines"], ) - if pipeline_wrapper._has_run_chat: + if pipeline_wrapper._is_run_chat_implemented: clog.debug("Creating dynamic Pydantic models for run_chat") @handle_pipeline_exceptions() @@ -301,15 +301,10 @@ def create_pipeline_wrapper_instance(pipeline_module: ModuleType) -> BasePipelin except Exception as e: raise PipelineWrapperError(f"Failed to call setup() on pipeline wrapper instance: {str(e)}") from e - has_run_api = pipeline_wrapper.run_api.__func__ is not BasePipelineWrapper.run_api - if has_run_api: - pipeline_wrapper._has_run_api = True + pipeline_wrapper._is_run_api_implemented = pipeline_wrapper.run_api.__func__ is not BasePipelineWrapper.run_api + pipeline_wrapper._is_run_chat_implemented = pipeline_wrapper.run_chat.__func__ is not BasePipelineWrapper.run_chat - has_run_chat = pipeline_wrapper.run_chat.__func__ is not BasePipelineWrapper.run_chat - if has_run_chat: - pipeline_wrapper._has_run_chat = True - - if not (has_run_api or has_run_chat): + if not (pipeline_wrapper._is_run_api_implemented or pipeline_wrapper._is_run_chat_implemented): raise PipelineWrapperError("At least one of run_api or run_chat must be implemented") return pipeline_wrapper From 67801e0021aa40b949009401bc04256284ffcb80 Mon Sep 17 00:00:00 2001 From: Michele Pangrazzi Date: Mon, 27 Jan 2025 15:51:37 +0100 Subject: [PATCH 23/23] Add comment reference --- src/hayhooks/server/utils/deploy_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/hayhooks/server/utils/deploy_utils.py b/src/hayhooks/server/utils/deploy_utils.py index cb05b42..ae4ce09 100644 --- a/src/hayhooks/server/utils/deploy_utils.py +++ b/src/hayhooks/server/utils/deploy_utils.py @@ -245,6 +245,7 @@ def deploy_pipeline_files(app: FastAPI, pipeline_name: str, files: dict[str, str RunResponse = create_response_model_from_callable(pipeline_wrapper.run_api, f'{pipeline_name}Run') @handle_pipeline_exceptions() + # See comment on pipeline_run() for explanation of the "type: ignore" below async def run_endpoint(run_req: RunRequest) -> JSONResponse: # type: ignore result = await run_in_threadpool(pipeline_wrapper.run_api, urls=run_req.urls, question=run_req.question) return JSONResponse({"result": result}, status_code=200)