Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New pipeline deployment system #58

Merged
merged 23 commits into from
Jan 27, 2025
Merged

New pipeline deployment system #58

merged 23 commits into from
Jan 27, 2025

Conversation

mpangrazzi
Copy link
Contributor

@mpangrazzi mpangrazzi commented Jan 20, 2025

TODO

  • add save_pipeline_files to save pipeline files in a folder
  • add load_pipeline_module to load PipelineWrapper from new saved module
  • make registry support PipelineWrapper as well as Pipeline
  • create a new deployment method which creates 2 new API routes for each pipeline (one will run run_api while the other one will run run_chat)
  • update the current /deploy route to support the new deployment system add new /deploy_files route
  • add/update tests
  • handle exceptions during pipeline execution in dynamically added API routes
  • add deploy-files CLI command
  • ensure pipeline loading logic at startup supports new deployment system
  • manual tests
  • add a comment with a small demo

TODO after reviews

  • fix abstract BasePipelineWrapper run_api's signature
  • only one of run_api and run_chat must be implement (not mandatory to have both)

Goals

This is to add a new pipeline deployment system based on pipeline files (heavily inspired by open-webui pipelines).

This has multiple goals:

  • Making deployment more flexible: Pipeline can be provided both as a YAML (and loaded in wrapper class) but also directly as code.
  • A new run_api method needs to be implemented, which will contain the needed code to run the pipeline. Method input's args will be converted in a JSON-compatible Pydantic model and it will be used as API route input. This way, will be easier to run pipelines since users will expose exactly the input fields they need (rather than try to modeling all pipeline components inputs/outputs)
  • A new run_chat method can be implemented, which will receive in input an OpenAI-compatible user message. This way will be easy to use hayhooks as a custom backend for open-webui. Note: open-webui support will be added in a different PR.

Description

The main idea is to let users provide a Pipeline wrapper class while deploying pipeline like the following:

from pathlib import Path
from typing import List
from haystack import Pipeline
from hayhooks.server.utils.base_pipeline_wrapper import BasePipelineWrapper


URLS = ["https://haystack.deepset.ai", "https://www.redis.io"]


class PipelineWrapper(BasePipelineWrapper):
    def setup(self) -> None:
        pipeline_yaml = (Path(__file__).parent / "chat_with_website.yml").read_text()
        self.pipeline = Pipeline.loads(pipeline_yaml)

    def run_api(self, urls: List[str], question: str) -> dict:
        return self.pipeline.run({"fetcher": {"urls": urls}, "prompt": {"query": question}})

    def run_chat(self, user_message: str, model_id: str, messages: List[dict], body: dict) -> dict:
        question = user_message
        return self.pipeline.run({"fetcher": {"urls": URLS}, "prompt": {"query": question}})

Where BasePipelineWrapper is:

from abc import ABC, abstractmethod
from typing import List


class BasePipelineWrapper(ABC):
    def __init__(self):
        self.pipeline = None

    @abstractmethod
    def setup(self) -> None:
        """
        Setup the pipeline.

        This method should be called before using the pipeline.
        """
        pass

    @abstractmethod
    def run_api(self, urls: List[str], question: str) -> dict:
        """
        Run the pipeline in API mode.

        Args:
            urls: List of URLs to fetch content from
            question: Question to be answered

        Returns:
            dict: Pipeline execution results
        """
        pass

    @abstractmethod
    def run_chat(self, user_message: str, model_id: str, messages: List[dict], body: dict) -> dict:
        """
        Run the pipeline in chat mode.

        Args:
            user_message: Message from the user
            model_id: ID of the model to use
            messages: List of previous messages
            body: Additional request body parameters

        Returns:
            dict: Pipeline execution results
        """
        pass

On the example above, user will call /deploy_files route and provide both the YAML file of the pipeline and the pipeline_wrapper.py file. They will be saved in a folder, pipeline_wrapper.py will be loaded as a module and two API routes will be added dynamically (for chat and API runs).

A full end-to-end example will be provided below.

@mpangrazzi mpangrazzi changed the title New deploy New pipeline deployment system Jan 20, 2025
@mpangrazzi mpangrazzi self-assigned this Jan 23, 2025
@mpangrazzi
Copy link
Contributor Author

mpangrazzi commented Jan 23, 2025

Let's assume we want to deploy chat_with_website pipeline using the proposed new deployment method.

First, we need to to launch the server. We're also showing debug logs in this example using LOG=debug environment variable.

LOG=debug hayhooks server

Deploy the pipeline

We have 3 ways to deploy the pipeline:

  • Deploy using the deploy-file CLI command
  • Deploy using the /deploy-files API endpoint
  • Automatically deploy the pipeline copying the files in pipelines/chat_with_website folder and ensure pipelines_dir setting is set to pipelines.

Deploy using the deploy-file CLI command

First, we need to create a local folder with the pipeline files.

mkdir -p input_files/chat_with_website

First, we copy the chat_with_website.yml file to the input_files/chat_with_website folder.
Then, we need to create the pipeline_wrapper.py file in the input_files/chat_with_website folder, which will look like this:

from pathlib import Path
from typing import List
from haystack import Pipeline
from hayhooks.server.utils.base_pipeline_wrapper import BasePipelineWrapper
from hayhooks.server.logger import log


URLS = ["https://haystack.deepset.ai", "https://www.redis.io"]


class PipelineWrapper(BasePipelineWrapper):
    def setup(self) -> None:
        pipeline_yaml = (Path(__file__).parent / "chat_with_website.yml").read_text()
        self.pipeline = Pipeline.loads(pipeline_yaml)

    def run_api(self, urls: List[str], question: str) -> str:
        log.trace(f"Running pipeline with urls: {urls} and question: {question}")
        result = self.pipeline.run({"fetcher": {"urls": urls}, "prompt": {"query": question}})
        return result["llm"]["replies"][0]

    def run_chat(self, user_message: str, model_id: str, messages: List[dict], body: dict) -> str:
        log.trace(
            f"Running pipeline with user_message: {user_message}, model_id: {model_id}, messages: {messages}, body: {body}"
        )
        question = user_message
        result = self.pipeline.run({"fetcher": {"urls": URLS}, "prompt": {"query": question}})
        return result["llm"]["replies"][0]

Some notes on this:

  • setup() method is used to initialize the pipeline. Since we provided the YAML file in the input_files/chat_with_website folder, we can load it directly from there.
  • run_api() method is used to run the pipeline when the user sends a request to the /{pipeline_name}/run endpoint. Here, we're using only urls and question as inputs (chosen by the user).
  • run_chat() method is used to run the pipeline when the user sends a message to the /{pipeline_name}/chat endpoint. Here, we're using user_message, model_id, messages and body as inputs (fixed). Also, since chat endpoint has only one input (i.e. the user's message), on that we are hardcoding the urls pipeline input param).

Now, we can deploy the pipeline using the deploy-file CLI command.

LOG=debug hayhooks deploy-files -n chat_with_website input_files/chat_with_website

This CLI command will basically call the /deploy-files API endpoint with the files in the input_files/chat_with_website folder.

You should see the following output:

Screenshot 2025-01-23 at 17 55 09

Deploy using the /deploy-files API endpoint

You also use directly the /deploy-files API endpoint. You can follow the /docs endpoint to see how to use it. But basically, you need to send a POST request to the /deploy-files with the following body:

{
  "name": "chat_with_website",
  "files": [
    {
      "pipeline_wrapper.py": "import ...",
      "chat_with_website.yml": "components: ..."
    }
  ]
}

Automatically deploy the pipeline at startup (recommended)

You can also automatically deploy the pipeline by copying the files in the pipelines/chat_with_website in the pipelines_dir folder (or in the folder you specified in as pipelines_dir setting).

In this case, the pipeline will be deployed automatically when you start the server. This is the recommended way to deploy the pipeline, since if you run the FastAPI app with multiple workers, the pipeline will be deployed in all the workers (see also docs/deployment_guidelines).

Run the pipeline

Using the /{pipeline_name}/run endpoint

To run the pipeline, you can use the dynamically generated API endpoints. We will focus on the /{pipeline_name}/run endpoint, which will basically call the run_api method in the pipeline_wrapper.py file.

Since run_api method has the following signature:

def run_api(self, urls: List[str], question: str) -> str:

You need to send a POST request to the /{pipeline_name}/run endpoint with the following body:

{
  "urls": ["https://haystack.deepset.ai", "https://www.redis.io"],
  "question": "What is the main purpose of Haystack?"
}

Note that if you need to add more inputs (which you may want to pass to pipeline.run() or for any other reason) you simply have to change the method signature, undeploy/deploy again the pipeline and of course add the additional input to your POST request body.

For convenience, you can also use the /docs endpoint and try it out easily.

Screenshot 2025-01-23 at 18 04 53

Using the /{pipeline_name}/chat endpoint

This will be covered in another PR!

@mpangrazzi mpangrazzi marked this pull request as ready for review January 23, 2025 17:09
@mpangrazzi mpangrazzi removed the request for review from bilgeyucel January 23, 2025 17:15
src/hayhooks/server/app.py Show resolved Hide resolved
pass

@abstractmethod
def run_chat(self, user_message: str, model_id: str, messages: List[dict], body: dict) -> dict:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

user_message and messages sound confusing to me...
user_message and history would be better, for example.
But maybe there is some reason I'm overlooking...


Let's also discuss on support/conversion of Haystack ChatMessage

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The incoming request looks like this one, but I agree, user_message (which is the last message) and history will be less confusing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a dev, I can still return generator here? Or somewhere else?

tests/test_deploy_files_at_startup copy.py Outdated Show resolved Hide resolved
src/hayhooks/server/utils/base_pipeline_wrapper.py Outdated Show resolved Hide resolved
Copy link
Member

@vblagoje vblagoje left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work man, looks almost there, a few things were not immediately clear to me so I left a few comments. And the code could use a few comments here and there as well. What about metadata for PipelineWrapper e.g. third party lib dependencies so we can load them and import automatically :-)

src/hayhooks/server/utils/base_pipeline_wrapper.py Outdated Show resolved Hide resolved
"""
Run the pipeline in chat mode.

Args:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Watch out for Google pydoc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're correct about run_api. I am about to rewrite base_pipeline_wrapper docstrings because they're not very precise.

pass

@abstractmethod
def run_chat(self, user_message: str, model_id: str, messages: List[dict], body: dict) -> dict:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a dev, I can still return generator here? Or somewhere else?

@mpangrazzi
Copy link
Contributor Author

mpangrazzi commented Jan 24, 2025

@vblagoje

About run_chat: yes my idea is to add an utility to hayhooks which will help you returning a generator (using internally streaming_callback for now). Then it should be easy to use an easier way when we will implement it.

I have this covered in the next PR!

What about metadata for PipelineWrapper e.g. third party lib dependencies so we can load them and import automatically :-)

Of course! I was planning to support additional requirements in a separate PR (this is already quite big!) ;)

@mpangrazzi
Copy link
Contributor Author

mpangrazzi commented Jan 27, 2025

@anakin87 @vblagoje Updates:

  • Now at least one between run_api and run_chat must be implemented
  • Added checks (e.g. self.pipeline should be an Haystack Pipeline instance after calling setup()
  • Updated tests

I think it should be enough for this PR. Next one will update run_chat implementation and add OpenAI compatible endpoints. But of course, I'll wait for your review first ;)

@mpangrazzi mpangrazzi requested a review from anakin87 January 27, 2025 14:51
Copy link
Member

@anakin87 anakin87 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we will take care of run_chat in a future PR,
this PR looks good to me!

I would be happy if also @vblagoje could take a look.

@mpangrazzi mpangrazzi requested a review from vblagoje January 27, 2025 14:57
Copy link
Member

@vblagoje vblagoje left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Solid foundation, looking forward for upcoming PRs to complete this idea/effort

@mpangrazzi mpangrazzi merged commit daa0c18 into main Jan 27, 2025
4 checks passed
@mpangrazzi mpangrazzi deleted the new_deploy branch January 27, 2025 15:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants